你必定看得懂的 DDD+CQRS+EDA+ES 核心思想與極簡可運行代碼示例

前言

       隨着分佈式架構微服務的興起,DDD(領域驅動設計)、CQRS(命令查詢職責分離)、EDA(事件驅動架構)、ES(事件溯源)等概念也一併成爲時下的火熱概念,我也在早些時候閱讀了一些大佬的分析文,學習相關概念,不過一直有種霧裏看花、似懂非懂的感受。通過一段時間的學習和研究大佬的代碼後,本身設計實現了一套我消化理解後的代碼。爲了突出重點,避免受到大量實現細節的干擾,固然也是懶(這纔是主要緣由),其中的全部基礎設施都使用了現成的庫。所實現的研究成果也作成了傻瓜式一鍵體驗(我對對着黑框框敲命令沒什麼興趣,能點兩下鼠標搞定的事我毫不在鍵盤上敲又臭又長的命令,敲命令能敲出優越感的人我以爲應該是抖M)。html

正文

DDD(領域驅動設計)

       這必定是最羣魔亂舞的一個概念,每一個大佬都能講出一大篇演講稿,但都或多或少存在差別或分歧,在我初看 DDD 時,我就被整懵了,這究竟是咋回事?git

       如今回過頭來看,DDD 實際上是一個高階思想概念,並不能指導開發者如何敲鍵盤,是指導人如何思考領域問題,而不是指導人思考出具體的領域的。正是由於中間隔了一層虛幻飄渺的概念,致使不一樣的人得出了不一樣的結論。還好 DDD 存在一些比較具體容易落實的概念,如今就來說下我對這些常見基礎概念的理解和我編碼時的基本原則,但願你們能在看大佬的文章時不用一臉懵逼,也進行下心得交流。github

Entity(實體)

       實體是一個存儲數據的類,若是類中包含自身的合法性驗證規則之類的方法,通常稱之爲充血模型,相對的單純保存數據的則稱爲貧血模型(有時也叫作 POCO 類)。實體有一個重要性質,相等性是由標識屬性決定的,這個標識能夠是一個簡單的 int 型的 Id,也能夠是多個內部數據的某種組合(相似數據庫表的複合字段主鍵)。除標識外的其餘東西均不對兩個實體對象的相等性產生影響。而且實體的數據屬性是可更改的。數據庫

       有不少大佬認爲實體應該是充血的,但在我看來,貧血的彷佛更好,由於需求的不穩定性可能致使這些規則並不穩定,或規則自己並不惟一,在不一樣場合可能須要不一樣規則。這時候充血模型不管怎麼辦都很彆扭,若是把規則定義和校驗交給外部組件,這些需求就很容易知足,好比使用 FluentValidate 爲一種實體定義多套規則或對內部的規則條目按狀況從新組合。編程

ValueObject(值對象)

       值對象也是用來存儲數據的類。與實體相對,值對象沒有標識屬性,其相等性由全部內部屬性決定,當且僅當兩個值對象實例的全部屬性一一相等時,這兩個值對象相等。而且值對象的全部屬性爲只讀,僅能在構造函數中進行惟一一次設置,若是但願修改某個值對象的某一屬性,惟一的辦法是使用新的值對象替換舊的值對象。而且值對象常常做爲實體的屬性存在。json

      這個概念看起來和實體特別類似,都是用來存儲數據的,但也有些性質上的根本不一樣。網上的大佬一般會爲值對象編寫基類,但我認爲,值對象和實體在代碼實現上並無這麼大的區別。能夠看做整數和小數在計算機中表現爲不一樣的數據類型,但在數學概念上他們沒有區別,僅僅只是由於離散的計算機系統沒法完美表示連續的數學數字而產生的縫合怪。我傾向於根據類的代碼定義所表現出來的性質與誰相符就將其視爲誰,而不是看實現的接口或繼承的基類。由於需求的不肯定性會致使他們可能會發生轉換,根據代碼進行自我描述來判斷能夠避免不少潛在的麻煩。api

Aggregate,Aggregate Root(聚合及聚合根)

       聚合根表示一個領域所操做的頂級實體類型,其餘附屬數據都是聚合根的內部屬性,聚合根和其所屬的其餘實體的組合稱爲聚合。這是一個純概念性的東西。對領域實體的操做必須從聚合根開始,也就是說確保數據完整性的基本單位是聚合。大佬的代碼中常常會用一個空接口來表示聚合根,若是某個實體實現了這個接口,就表示這個實體能夠是一個聚合根。請注意,聚合根不必定必須是頂級類型,也能夠是其餘實體的一個屬性。這表示一個實體在,某些狀況下是聚合根,而其餘狀況下是另外一個聚合根的內部屬性。也就是說實體之間並不是嚴格的樹狀關係,而是通常有向圖狀關係。架構

       我認爲定義這樣的空接口實際意不大,反而可能形成一些誤會。若是某個實體因爲需求變更致使再也不會成爲聚合根,那這個實體事實上將再也不是聚合根,但人是會犯錯的,極可能忘記去掉聚合根接口,這時代碼與事實將產生矛盾。因此我認爲聚合根應該基於事實而不是代碼。當一個實體再也不會做爲聚合根使用時,將相關代碼刪除,就同時表示它再也不是聚合根,閱讀代碼的人也由於看不到相關代碼而自動認爲它不是聚合根。在代碼中的體現方式與下一個的概念有關。併發

Repository(倉儲)

       倉儲表示對聚合根的持久化的抽象,在代碼上可表現爲聲明瞭增刪查改的相關方法的接口,而倉儲的實現類負責具體解決如何對聚合根實體進行增刪查改。例如在倉儲內部使用數據庫完成具體工做。app

       若是一個倉儲負責管理一個聚合根實體的持久化或者說存取,那這個實體就是一個事實上的聚合根。那麼在這裏,就能夠在代碼操做上將看到某個實體被倉儲管理等價爲這個實體是聚合根,反之就不是。也就是說,若是將某個實體的倉儲的最後一個實際使用代碼刪除,這個實體就在事實上再也不是聚合根,此時代碼表現與事實將完美同步,再也不會產生矛盾。至於因爲沒看到某個實體的倉儲而將實體誤認爲不是聚合根,這其實並無任何問題。這說明在你所關注的領域中這個實體確實不是聚合根,而這個實體可能做爲聚合根使用的領域你根本不關心,因此看不到,那這個實體是否在其餘領域做爲聚合根使用對你而言實際上是無所謂的。

Domain Service(領域服務)

       這就涉及到業務代碼的編寫了。若是一個業務須要由多個聚合根配合完成,也就是須要多個倉儲,那麼就應該將這些對倉儲的調用封裝進一個服務,統一對外暴露提供服務。

       若是這些倉儲操做須要具備事務性,也能夠在這裏進行協調管理。若是某個業務只須要一個倉儲參與,要不要專門封裝一個服務就看你高興了。

CQRS(命令查詢職責分離)

       CQRS 本質上是一種指導思想,指導開發者如何設計一個低耦合高可擴展架構的思想。傳統的 CURD 將對數據的操做分爲 讀、寫、改、刪,將他們封裝在一塊兒致使他們將緊密耦合在相同的數據源中,不利於擴展。CQRS 則將對數據的操做分爲會改變數據源的和不會改變數據源的,前者稱爲命令,後者稱爲查詢。將他們分別封裝能讓他們各自使用不一樣的數據源,提升可擴展性。

       其中命令是一個會改變數據源,但不返回任何值的方法;查詢是會返回值,但毫不會改變數據源的方法。可是在個人編碼中,命令是能夠返回值的,至於要返回什麼,根據實際狀況調整。好比最簡單的返回一個 bool 表示操做是否成功以決定接下來的業務流程該走向何方,這是很常見的狀況。因此在個人概念裏,一個方法是命令仍是查詢實際上只看這個方法是否會改變數據源,要封裝在一塊兒仍是分別封裝都無所謂。建議分開封裝到不一樣的倉儲中,經過倉儲關聯到具體的數據源,命令和查詢的倉儲關聯到不一樣的數據源的時候,天然就完成了讀寫分離。經過起名來明示方法的目的應該能夠輕鬆分辨一個方法屬於命令仍是查詢。只要腦子裏有這個概念,要實現擴展辦法多的是。

事件驅動架構(EDA)

       能夠說全部圖形界面(Gui)編程都是清一色的事件驅動架構,這東西一點也不稀奇。說白了,EDA 就是一種被動架構,經過某些事情的發生來觸發某些操做的執行,不然系統就隨時待命,按兵不動。

       EDA 的實現須要一箇中介才能實現,在 Windows 中,這個東西叫作 Windows 消息隊列(消息循環)和事件處理器。一樣的,在非 Gui 編程中也須要這倆東西,但一般被稱爲消息總線和消息消費者。在分佈式系統中,這個中介將不與系統在同一進程甚至不在同一設備中,稱爲分佈式消息總線。這樣在開發時能夠分紅兩撥,一撥負責寫生產併發送事件的代碼,一撥負責寫接收事件信息並進行處理的代碼。他們之間的溝通僅限於交流關心的事件叫什麼以及事件攜帶了什麼信息。至於產生的消息是如何送到正確的消費端並觸發消費處理器的,那是消息總線的事。若是一個消息總線須要這兩撥人瞭解中間的過程甚至須要本身去實現,那這個消息總線是個廢品,也起不到什麼解耦的效果,甚至是個拖後腿的東西。

EDA + CQRS

       當他們結合在一塊兒,就產生了命令或查詢的發起和實際處理實現能夠分離的效果。命令的發起方向命令總線發送一條命令消息並帶上必要參數,消費方收到消息後獲取參數完成任務並返回結果。命令能夠看做一種特殊的事件,命令只由一個命令處理器處理,並可向發送方返回一個處理結果;事件由全部對同種事件感興趣的事件處理器處理,不向事件發送方返回任何結果。

       事件處理器的執行順序是不肯定的,因此任何事件處理器都必須獨立完成事件處理。若是兩個事件處理之間存在因果依賴,應該在前置事件處理後由事件處理器發佈新事件,並由後置事件處理器去處理前置事件產生的新事件,而不是讓它們處理同一事件。

ES(事件溯源)

       事件溯源表示能追查一個事件的源頭,甚至與之相關的其餘事件的概念,說句大白話就是刨祖墳。ES 對歷史狀態回溯的需求有着自然的支持,最多見的如撤銷重作。而 ES 通常會配合 EDA 使用,ES 保存 EDA 產生的事件信息,而且這些信息有隻讀性和因果連貫性。這順便能讓咱們對系統中的實體到底是如何一步一步變成如今這個樣子有一個清晰的瞭解。畢竟實體具備可變性,實體信息一旦改變,舊的信息就會丟失,ES 恰好彌補了這個缺陷。

代碼展現說明

       此處的事件消息中介使用 MediatR 實現。

接口

       DDD 相關

實體

       定義一個實體的基本要素,實現接口的類就是實體,值對象沒有接口或基類,只看代碼所展示的性質是否符合值對象的定義,聚合根沒有接口或基類,只看實體是否被倉儲使用,領域服務說白了就是個打包封裝,根據狀況來決定,例如重構時提取方法便可視爲封裝服務。在此處可簡單認爲沒有實現實體接口的數據類是值對象:

 1 /// <summary>
 2 /// 實體接口
 3 /// </summary>
 4 public interface IEntity {}
 5 
 6 /// <summary>
 7 /// 泛型實體接口,約束Id屬性
 8 /// </summary>
 9 public interface IEntity<TKey> : IEntity
10     where TKey : IEquatable<TKey>
11 {
12     TKey Id { get; set; }
13 }

倉儲接口

       倉儲接口細分爲可讀倉儲和可寫倉儲,可寫倉儲有一個分支爲可批量提交倉儲,表示修改操做會在調用提交保存方法後批量保存,也就是事務(就是用來替代操做單元的,這東西就有一個提交操做,名字也莫名其妙,我曾經一直沒法理解這東西是幹嗎的),接口聲明參考 EF Core,示例實現也基於 EF Core。因爲已經公開了查詢接口類型的 Set 屬性,使用者能夠任意自定義查詢。

 1     public interface IBulkOperableVariableRepository<TResult, TVariableRepository, TEntity>
 2         where TEntity : IEntity
 3         where TVariableRepository : IVariableRepository<TEntity>
 4     {
 5         TResult SaveChanges();
 6         Task<TResult> SaveChangesAsync(CancellationToken cancellationToken);
 7     }
 8 
 9     public interface IBulkOperableVariableRepository<TVariableRepository, TEntity>
10         where TEntity : IEntity
11         where TVariableRepository : IVariableRepository<TEntity>
12     {
13         void SaveChanges();
14         Task SaveChangesAsync(CancellationToken cancellationToken);
15     }
16 
17     public interface IReadOnlyRepository<TEntity>
18         where TEntity : IEntity
19     {
20         IQueryable<TEntity> Set { get; }
21         TEntity Find(TEntity entity, bool ignoreNullValue);
22         Task<TEntity> FindAsync(TEntity entity, bool ignoreNullValue);
23 
24     }
25     public interface IReadOnlyRepository<TEntity, TKey> : IReadOnlyRepository<TEntity>
26         where TEntity : IEntity<TKey>
27         where TKey : IEquatable<TKey>
28     {
29         TEntity Find(TKey key);
30         Task<TEntity> FindAsync(TKey key);
31         IQueryable<TEntity> Find(IEnumerable<TKey> keys);
32     }
33 
34     public interface IVariableRepository<TEntity>
35         where TEntity : IEntity
36     {
37         void Add(TEntity entity);
38         Task AddAsync(TEntity entity, CancellationToken cancellationToken);
39         void Update(TEntity entity);
40         Task UpdateAsync(TEntity entity, CancellationToken cancellationToken);
41         void Delete(TEntity entity, bool isSoftDelete);
42         Task DeleteAsync(TEntity entity, bool isSoftDelete, CancellationToken cancellationToken);
43         void AddRange(IEnumerable<TEntity> entities);
44         Task AddRangeAsync(IEnumerable<TEntity> entities, CancellationToken cancellationToken);
45         void UpdateRange(IEnumerable<TEntity> entities);
46         Task UpdateRangeAsync(IEnumerable<TEntity> entities, CancellationToken cancellationToken);
47         void DeleteRange(IEnumerable<TEntity> entities, bool isSoftDelete);
48         Task DeleteRangeAsync(IEnumerable<TEntity> entities, bool isSoftDelete, CancellationToken cancellationToken);
49     }
50     public interface IVariableRepository<TEntity, TKey> : IVariableRepository<TEntity>
51         where TEntity : IEntity<TKey>
52         where TKey : IEquatable<TKey>
53     {
54         void Delete(TKey key, bool isSoftDelete);
55         Task DeleteAsync(TKey key, bool isSoftDelete, CancellationToken cancellationToken);
56         void DeleteRange(IEnumerable<TKey> keys, bool isSoftDelete);
57         Task DeleteRangeAsync(IEnumerable<TKey> keys, bool isSoftDelete, CancellationToken cancellationToken);
58     }
59 
60     public interface IRepository<TEntity> : IVariableRepository<TEntity>, IReadOnlyRepository<TEntity>
61         where TEntity : IEntity
62     {
63     }
64 
65     public interface IRepository<TEntity, TKey> : IRepository<TEntity>, IVariableRepository<TEntity, TKey>, IReadOnlyRepository<TEntity, TKey>
66         where TEntity : IEntity<TKey>
67         where TKey : IEquatable<TKey>
68     {
69     }

EF Core 專用特化版倉儲接口

 1     public interface IEFCoreRepository<TEntity, TDbContext> : IReadOnlyRepository<TEntity>, IVariableRepository<TEntity>, IBulkOperableVariableRepository<int, IEFCoreRepository<TEntity, TDbContext>, TEntity>
 2         where TEntity : class, IEntity
 3         where TDbContext : DbContext
 4     { }
 5 
 6     public interface IEFCoreRepository<TEntity, TKey, TDbContext> : IEFCoreRepository<TEntity, TDbContext>, IReadOnlyRepository<TEntity, TKey>, IVariableRepository<TEntity, TKey>
 7         where TEntity : class, IEntity<TKey>
 8         where TKey : IEquatable<TKey>
 9         where TDbContext : DbContext
10     { }

 

       CQRS+EDA 相關:

命令接口

       分爲帶返回值命令和無返回值命令

1 public interface ICommand<out TResult> : ICommand
2 {
3 }
4 
5 public interface ICommand : IMessage
6 {
7 }

命令總線接口

       一樣分爲帶返回值和無返回值

 1 public interface ICommandBus<in TCommand>
 2     where TCommand : ICommand
 3 {
 4     Task SendCommandAsync(TCommand command, CancellationToken cancellationToken);
 5 }
 6 
 7 public interface ICommandBus<in TCommand, TResult> : ICommandBus<TCommand>
 8     where TCommand : ICommand<TResult>
 9 {
10     new Task<TResult> SendCommandAsync(TCommand command, CancellationToken cancellationToken);
11 }

命令處理器接口

       同上

 1 public interface ICommandHandler<in TCommand>
 2     where TCommand : ICommand
 3 {
 4     Task Handle(TCommand command, CancellationToken cancellationToken);
 5 }
 6 
 7 public interface ICommandHandler<in TCommand, TResult> : ICommandHandler<TCommand>
 8     where TCommand : ICommand<TResult>
 9 {
10     new Task<TResult> Handle(TCommand command, CancellationToken cancellationToken);
11 }

命令存儲接口

       可用於歷史命令追溯,返回值可用於返回存儲是否成功或其餘必要信息

 1 public interface ICommandStore
 2 {
 3     void Save(ICommand command);
 4 
 5     Task SaveAsync(ICommand command, CancellationToken cancellationToken);
 6 }
 7 
 8 public interface ICommandStore<TResult> : ICommandStore
 9 {
10     new TResult Save(ICommand command);
11 
12     new Task<TResult> SaveAsync(ICommand command, CancellationToken cancellationToken);
13 }

事件接口

       沒有返回值

1 public interface IEvent : IMessage
2 {
3 }

事件總線接口

       同上

 1 public interface IEventBus
 2 {
 3     void PublishEvent(IEvent @event);
 4 
 5     Task PublishEventAsync(IEvent @event, CancellationToken cancellationToken);
 6 }
 7 
 8 public interface IEventBus<TResult> : IEventBus
 9 {
10     new TResult PublishEvent(IEvent @event);
11 
12     new Task<TResult> PublishEventAsync(IEvent @event, CancellationToken cancellationToken);
13 }

事件處理器接口

       同上

1 public interface IEventHandler<in TEvent>
2     where TEvent : IEvent
3 {
4     Task Handle(TEvent @event, CancellationToken cancellationToken);
5 }

事件存儲接口

       同命令存儲接口

 1 public interface IEventStore
 2 {
 3     void Save(IEvent @event);
 4 
 5     Task SaveAsync(IEvent @event, CancellationToken cancellationToken = default);
 6 }
 7 
 8 public interface IEventStore<TResult> : IEventStore
 9 {
10     new TResult Save(IEvent @event);
11 
12     new Task<TResult> SaveAsync(IEvent @event, CancellationToken cancellationToken = default);
13 }

(命令、事件)消息基礎接口

1 public interface IMessage
2 {
3     Guid Id { get; }
4 
5     DateTimeOffset Timestamp { get; }
6 }

       相關接口定義完畢。

實現

EF Core 泛型倉儲

       未知主鍵的實體使用實體對象爲條件查找時,使用動態生成表達式的方法

  1     public class EFCoreRepository<TEntity, TKey, TDbContext> : EFCoreRepository<TEntity, TDbContext>, IEFCoreRepository<TEntity, TKey, TDbContext>
  2         where TEntity : class, IEntity<TKey>
  3         where TKey : IEquatable<TKey>
  4         where TDbContext : DbContext
  5     {
  6         public EFCoreRepository(TDbContext dbContext) : base(dbContext)
  7         {
  8         }
  9 
 10         public virtual void Delete(TKey key, bool isSoftDelete)
 11         {
 12             var entity = Find(key);
 13             Delete(entity, isSoftDelete);
 14         }
 15 
 16         public virtual Task DeleteAsync(TKey key, bool isSoftDelete, CancellationToken cancellationToken = default)
 17         {
 18             Delete(key, isSoftDelete);
 19             return Task.CompletedTask;
 20         }
 21 
 22         public virtual void DeleteRange(IEnumerable<TKey> keys, bool isSoftDelete)
 23         {
 24             var entities = Find(keys).ToArray();
 25             dbSet.AttachRange(entities);
 26             DeleteRange(entities, isSoftDelete);
 27         }
 28 
 29         public virtual Task DeleteRangeAsync(IEnumerable<TKey> keys, bool isSoftDelete, CancellationToken cancellationToken = default)
 30         {
 31             DeleteRange(keys, isSoftDelete);
 32             return Task.CompletedTask;
 33         }
 34 
 35         public virtual TEntity Find(TKey key)
 36         {
 37             return Set.SingleOrDefault(x => x.Id.Equals(key));
 38         }
 39 
 40         public virtual IQueryable<TEntity> Find(IEnumerable<TKey> keys)
 41         {
 42             return Set.Where(x => keys.Contains(x.Id));
 43         }
 44 
 45         public override TEntity Find(TEntity entity, bool ignoreNullValue)
 46         {
 47             return base.Find(entity, ignoreNullValue);
 48         }
 49 
 50         public virtual Task<TEntity> FindAsync(TKey key)
 51         {
 52             return Set.SingleOrDefaultAsync(x => x.Id.Equals(key));
 53         }
 54 
 55         public override Task<TEntity> FindAsync(TEntity entity, bool ignoreNullValue)
 56         {
 57             return base.FindAsync(entity, ignoreNullValue);
 58         }
 59     }
 60 
 61     public class EFCoreRepository<TEntity, TDbContext> : IEFCoreRepository<TEntity, TDbContext>
 62         where TEntity : class, IEntity
 63         where TDbContext : DbContext
 64     {
 65         protected readonly TDbContext dbContext;
 66         protected readonly DbSet<TEntity> dbSet;
 67 
 68         protected virtual void ProcessChangedEntity()
 69         {
 70             var changedEntities = dbContext.ChangeTracker.Entries()
 71                 .Where(x => x.State == EntityState.Added || x.State == EntityState.Modified);
 72             foreach (var entity in changedEntities)
 73             {
 74                 (entity as IOptimisticConcurrencySupported)?.GenerateNewConcurrencyStamp();
 75             }
 76 
 77             var changedEntitiesGroups = changedEntities.GroupBy(x => x.State);
 78             foreach (var group in changedEntitiesGroups)
 79             {
 80                 switch (group)
 81                 {
 82                     case var entities when entities.Key == EntityState.Added:
 83                         foreach (var entity in entities)
 84                         {
 85                             if (entity is IActiveControllable)
 86                             {
 87                                 (entity as IActiveControllable).Active ??= true;
 88                             }
 89                         }
 90                         break;
 91                     case var entities when entities.Key == EntityState.Modified:
 92                         foreach (var entity in entities)
 93                         {
 94                             (entity as IEntity)?.ProcessCreationInfoWhenModified(dbContext);
 95 
 96                             if (entity is IActiveControllable && (entity as IActiveControllable).Active == null)
 97                             {
 98                                 entity.Property(nameof(IActiveControllable.Active)).IsModified = false;
 99                             }
100                         }
101                         break;
102                     default:
103                         break;
104                 }
105             }
106         }
107 
108         protected virtual void ResetDeletedMark(params TEntity[] entities)
109         {
110             foreach (var entity in entities)
111             {
112                 if (entity is ILogicallyDeletable)
113                 {
114                     (entity as ILogicallyDeletable).IsDeleted = false;
115                 }
116             }
117         }
118 
119         public EFCoreRepository(TDbContext dbContext)
120         {
121             this.dbContext = dbContext;
122             dbSet = this.dbContext.Set<TEntity>();
123         }
124 
125         public virtual void Add(TEntity entity)
126         {
127             dbSet.Add(entity);
128         }
129 
130         public virtual Task AddAsync(TEntity entity, CancellationToken cancellationToken = default)
131         {
132             return dbSet.AddAsync(entity, cancellationToken).AsTask();
133         }
134 
135         public virtual void AddRange(IEnumerable<TEntity> entities)
136         {
137             dbSet.AddRange(entities);
138         }
139 
140         public virtual Task AddRangeAsync(IEnumerable<TEntity> entities, CancellationToken cancellationToken = default)
141         {
142             return dbSet.AddRangeAsync(entities, cancellationToken);
143         }
144 
145         public virtual void Delete(TEntity entity, bool isSoftDelete)
146         {
147             dbSet.Attach(entity);
148             if (isSoftDelete)
149             {
150                 if (entity is ILogicallyDeletable)
151                 {
152                     (entity as ILogicallyDeletable).IsDeleted = true;
153                 }
154                 else
155                 {
156                     throw new InvalidOperationException($"要求軟刪除的實體不實現{nameof(ILogicallyDeletable)}接口。");
157                 }
158             }
159             else
160             {
161                 dbSet.Remove(entity);
162             }
163         }
164 
165         public virtual Task DeleteAsync(TEntity entity, bool isSoftDelete, CancellationToken cancellationToken = default)
166         {
167             Delete(entity, isSoftDelete);
168             return Task.CompletedTask;
169         }
170 
171         public virtual void DeleteRange(IEnumerable<TEntity> entities, bool isSoftDelete)
172         {
173             dbSet.AttachRange(entities);
174             foreach (var entity in entities)
175             {
176                 Delete(entity, isSoftDelete);
177             }
178         }
179 
180         public virtual Task DeleteRangeAsync(IEnumerable<TEntity> entities, bool isSoftDelete, CancellationToken cancellationToken = default)
181         {
182             DeleteRange(entities, isSoftDelete);
183             return Task.CompletedTask;
184         }
185 
186         public virtual TEntity Find(TEntity entity, bool ignoreNullValue)
187         {
188             var exp = GenerateWhere(dbContext, entity, ignoreNullValue);
189 
190             return Set.SingleOrDefault(exp);
191         }
192 
193         public virtual Task<TEntity> FindAsync(TEntity entity, bool ignoreNullValue)
194         {
195             var exp = GenerateWhere(dbContext, entity, ignoreNullValue);
196 
197             return Set.SingleOrDefaultAsync(exp);
198         }
199 
200         public virtual int SaveChanges()
201         {
202             ProcessChangedEntity();
203             return dbContext.SaveChanges();
204         }
205 
206         public virtual Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
207         {
208             ProcessChangedEntity();
209             return dbContext.SaveChangesAsync(cancellationToken);
210         }
211 
212         public virtual IQueryable<TEntity> Set => dbSet.AsNoTracking();
213 
214         public virtual void Update(TEntity entity)
215         {
216             ResetDeletedMark(entity);
217             dbSet.Update(entity);
218         }
219 
220         public virtual Task UpdateAsync(TEntity entity, CancellationToken cancellationToken = default)
221         {
222             Update(entity);
223             return Task.CompletedTask;
224         }
225 
226         public virtual void UpdateRange(IEnumerable<TEntity> entities)
227         {
228             ResetDeletedMark(entities.ToArray());
229             dbSet.UpdateRange(entities);
230         }
231 
232         public virtual Task UpdateRangeAsync(IEnumerable<TEntity> entities, CancellationToken cancellationToken = default)
233         {
234             UpdateRange(entities);
235             return Task.CompletedTask;
236         }
237 
238         static private Expression<Func<TEntity, bool>> GenerateWhere(TDbContext dbContext, TEntity entity, bool ignoreNullValue)
239         {
240             //查找實體類型主鍵
241             var model = dbContext.Model.FindEntityType(typeof(TEntity));
242             var key = model.FindPrimaryKey();
243 
244             //查找全部主鍵屬性,若是沒有主鍵就使用全部實體屬性
245             IEnumerable<PropertyInfo> props;
246             if (key != null)
247             {
248                 props = key.Properties.Select(x => x.PropertyInfo);
249             }
250             else
251             {
252                 props = model.GetProperties().Select(x => x.PropertyInfo);
253             }
254 
255             //生成表達式參數
256             ParameterExpression parameter = Expression.Parameter(typeof(TEntity), "x");
257 
258             //初始化提取實體類型全部屬性信息生成屬性訪問表達式幷包裝備用
259             var keyValues = props.Select(x => new { key = x, value = x.GetValue(entity), propExp = Expression.Property(parameter, x) });
260             //初始化存儲由基礎類型組成的屬性信息(只要個空集合,實際數據在後面的循環中填充)
261             var primitiveKeyValues = keyValues.Take(0).Where(x => IsPrimitiveType(x.key.PropertyType));
262             //初始化基礎類型屬性的相等比較表達式存儲集合(只要個空集合,實際數據在後面的循環中填充)
263             var equals = primitiveKeyValues.Take(0).Select(x => Expression.Equal(x.propExp, Expression.Constant(x.value)));
264             //初始化複雜類型屬性存儲集合
265             var notPrimitiveKeyValues = primitiveKeyValues;
266 
267             //若是還有元素,說明上次用於提取信息的複雜屬性內部還存在複雜屬性,接下來用提取到的基礎類型屬性信息生成相等比較表達式併合併到存儲集合而後繼續提取剩下的複雜類型屬性的內部屬性
268             while (keyValues.Count() > 0)
269             {
270                 if (ignoreNullValue)
271                 {
272                     keyValues = keyValues.Where(x => x.value != null);
273                 }
274                 //提取由基礎類型組成的屬性信息
275                 primitiveKeyValues = keyValues.Where(x => IsPrimitiveType(x.key.PropertyType));
276                 //生成基礎類型屬性的相等比較表達式
277                 equals = equals.Concat(primitiveKeyValues.Select(x => Expression.Equal(x.propExp, Expression.Constant(x.value))));
278                 //提取複雜類型屬性
279                 notPrimitiveKeyValues = keyValues.Except(primitiveKeyValues);
280                 //分別提取各個複雜類型屬性內部的屬性信息繼續生成內部屬性訪問表達式
281                 keyValues =
282                     from kv in notPrimitiveKeyValues
283                     from propInfo in kv.value.GetType().GetProperties()
284                     select new { key = propInfo, value = propInfo.GetValue(kv.value), propExp = Expression.Property(kv.propExp, propInfo) };
285             }
286 
287             //若是相等比較表達式有多個,將全部相等比較表達式用 && 運算鏈接起來
288             var and = equals.First();
289             foreach (var eq in equals.Skip(1))
290             {
291                 and = Expression.AndAlso(and, eq);
292             }
293 
294             //生成完整的過濾條件表達式,形如:  (TEntity x) => { return x.a == ? && x.b == ? && x.obj1.m == ? && x.obj1.n == ? && x.obj2.u.v == ?; }
295             var exp = Expression.Lambda<Func<TEntity, bool>>(and, parameter);
296 
297             //判斷某個類型是不是基礎數據類型
298             static bool IsPrimitiveType(Type type)
299             {
300                 var primitiveTypes = new[] {
301                     typeof(sbyte)
302                     ,typeof(byte)
303                     ,typeof(short)
304                     ,typeof(ushort)
305                     ,typeof(int)
306                     ,typeof(uint)
307                     ,typeof(long)
308                     ,typeof(ulong)
309                     ,typeof(float)
310                     ,typeof(double)
311                     ,typeof(decimal)
312                     ,typeof(char)
313                     ,typeof(string)
314                     ,typeof(bool)
315                     ,typeof(DateTime)
316                     ,typeof(DateTimeOffset)
317                     //,typeof(Enum)
318                     ,typeof(Guid)};
319 
320                 var tmp =
321                     type.IsDerivedFrom(typeof(Nullable<>))
322                     ? Nullable.GetUnderlyingType(type)
323                     : type;
324 
325                 return tmp.IsEnum || primitiveTypes.Contains(tmp);
326             }
327 
328             return exp;
329         }
330     }

命令

       命令基類

 1     public abstract class MediatRCommand : MediatRCommand<Unit>, ICommand, IRequest
 2     {
 3     }
 4 
 5     public abstract class MediatRCommand<TResult> : ICommand<TResult>, IRequest<TResult>
 6     {
 7         public Guid Id { get; }
 8 
 9         public DateTimeOffset Timestamp { get; }
10 
11         public MediatRCommand()
12         {
13             Id = Guid.NewGuid();
14             Timestamp = DateTimeOffset.Now;
15         }
16     }

       示例具體命令,命令只包含參數信息,如何使用參數信息完成任務是命令處理器的事

 1     public class ListUserCommand : MediatRCommand<IPagedList<ApplicationUser>>
 2     {
 3         public PageInfo PageInfo { get; }
 4         public QueryFilter QueryFilter { get; }
 5         public ListUserCommand(PageInfo pageInfo, QueryFilter queryFilter)
 6         {
 7             PageInfo = pageInfo;
 8             QueryFilter = queryFilter;
 9         }
10     }

命令總線

 1     public class MediatRCommandBus<TCommand, TResult> : ICommandBus<TCommand, TResult>
 2         where TCommand : MediatRCommand<TResult>
 3     {
 4         private readonly IMediator mediator;
 5         private readonly ICommandStore commandStore;
 6 
 7         public MediatRCommandBus(IMediator mediator, ICommandStore commandStore)
 8         {
 9             this.mediator = mediator;
10             this.commandStore = commandStore;
11         }
12 
13         public virtual Task<TResult> SendCommandAsync(TCommand command, CancellationToken cancellationToken = default)
14         {
15             commandStore?.SaveAsync(command, cancellationToken);
16             return mediator.Send(command, cancellationToken);
17         }
18 
19         Task ICommandBus<TCommand>.SendCommandAsync(TCommand command, CancellationToken cancellationToken)
20         {
21             return SendCommandAsync(command, cancellationToken);
22         }
23     }
24 
25     public class MediatRCommandBus<TCommand> : MediatRCommandBus<MediatRCommand<Unit>, Unit>
26         where TCommand : MediatRCommand<Unit>
27     {
28         public MediatRCommandBus(IMediator mediator, ICommandStore commandStore) : base(mediator, commandStore)
29         {
30         }
31     }

命令處理器

       命令處理器基類

 1     public abstract class MediatRCommandHandler<TCommand, TResult> : ICommandHandler<TCommand, TResult>, IRequestHandler<TCommand, TResult>
 2     where TCommand : MediatRCommand<TResult>
 3     {
 4         public abstract Task<TResult> Handle(TCommand command, CancellationToken cancellationToken = default);
 5 
 6         Task ICommandHandler<TCommand>.Handle(TCommand command, CancellationToken cancellationToken)
 7         {
 8             return Handle(command, cancellationToken);
 9         }
10     }
11 
12     public abstract class MediatRCommandHandler<TCommand> : MediatRCommandHandler<TCommand, Unit>
13         where TCommand : MediatRCommand
14     {
15     }

       具體命令處理器示例,使用注入的倉儲查詢數據,ApplicationUser 在這裏就是事實上的聚合根實體

 1     public class ListUserCommandHandler : MediatRCommandHandler<ListUserCommand, IPagedList<ApplicationUser>>
 2     {
 3         private IEFCoreRepository<ApplicationUser, int, ApplicationIdentityDbContext> repository;
 4 
 5         public ListUserCommandHandler(IEFCoreRepository<ApplicationUser, int, ApplicationIdentityDbContext> repository)
 6         {
 7             this.repository = repository;
 8         }
 9 
10         public override Task<IPagedList<ApplicationUser>> Handle(ListUserCommand command, CancellationToken cancellationToken = default)
11         {
12             return repository.Set
13                 .OrderBy(x => x.Id)
14                 .ToPagedListAsync(command.PageInfo.PageNumber, command.PageInfo.PageSize);
15         }
16     }

命令存儲

       什麼都沒幹,實際使用時可使用數據庫保存相關信息

 1     public class InProcessCommandStore : ICommandStore<bool>
 2     {
 3         public bool Save(ICommand command)
 4         {
 5             return SaveAsync(command).Result;
 6         }
 7 
 8         public Task<bool> SaveAsync(ICommand command, CancellationToken cancellationToken = default)
 9         {
10             return Task.FromResult(true);
11         }
12 
13         void ICommandStore.Save(ICommand command)
14         {
15             Save(command);
16         }
17 
18         Task ICommandStore.SaveAsync(ICommand command, CancellationToken cancellationToken)
19         {
20             return SaveAsync(command, cancellationToken);
21         }
22     }

       事件部分和命令基本相同,具體代碼能夠到文章末尾下載項目代碼查看。

使用

       在 Startup.ConfigureServices 方法中註冊相關服務,事件總線和命令總線都使用 MediatR 實現。.Net Core 內置 DI 支持註冊泛型服務,因此某個實體在實際使用時注入泛型倉儲就表示這個實體是聚合根,不用提早定義具體的聚合根實體倉儲,因此刪除使用代碼至關於刪除了倉儲定義。

1  services.AddScoped(typeof(ICommandBus<>), typeof(MediatRCommandBus<>));
2  services.AddScoped(typeof(ICommandBus<,>), typeof(MediatRCommandBus<,>));
3  services.AddScoped(typeof(ICommandStore), typeof(InProcessCommandStore));
4  services.AddScoped(typeof(IEventBus), typeof(MediatREventBus));
5  services.AddScoped(typeof(IEventBus<>), typeof(MediatREventBus<>));
6  services.AddScoped(typeof(IEventStore), typeof(InProcessEventStore));
7  services.AddScoped(typeof(IEFCoreRepository<,>), typeof(EFCoreRepository<,>));
8  services.AddScoped(typeof(IEFCoreRepository<,,>), typeof(EFCoreRepository<,,>));
9  services.AddMediatR(typeof(ListUserCommandHandler).GetTypeInfo().Assembly);

       示例使用比較簡單,就不定義服務了,若是須要定義服務,那麼使用服務的通常是命令處理器,倉儲由服務使用。這裏命令處理器直接使用倉儲。在控制器中注入命令總線,向命令總線發送命令就能夠獲取結果。MediatR 會自動根據發送的命令類型查找匹配的命令處理器去調用。

 1     [ApiController]
 2     [Route("api/[controller]")]
 3     public class UsersController : ControllerBase
 4     {
 5         private readonly ICommandBus<ListUserCommand, IPagedList<ApplicationUser>> _commandBus;
 6         private readonly IMapper _mapper;
 7 
 8         public UsersController(ICommandBus<ListUserCommand, IPagedList<ApplicationUser>> commandBus, IMapper mapper)
 9         {
10             _commandBus = commandBus;
11             _mapper = mapper;
12         }
13 
14         /// <summary>
15         /// 獲取用戶列表
16         /// </summary>
17         /// <param name="page">頁碼</param>
18         /// <param name="size">每頁條目數</param>
19         /// <returns>用戶列表</returns>
20         [HttpGet]
21         [Produces("application/json")] //聲明接口響應 json 數據
22         public async Task<IActionResult> GetAsync(int? page, int? size)
23         {
24             var cmd = new ListUserCommand(new PageInfo(page ?? 1, size ?? 10), new QueryFilter());
25             var users = await _commandBus.SendCommandAsync(cmd, default);
26 
27             return new JsonResult(
28                 new
29                 {
30                     rows = users.Select(u => _mapper.Map<ApplicationUserDto>(u)),
31                     total = users.PageCount, //總頁數
32                     page = users.PageNumber, //當前頁碼
33                     records = users.TotalItemCount //總記錄數
34                 }
35             );
36         }
37     }

       使用就是這麼簡單。使用者根本不須要知道命令處理器的存在,把命令發送到總線,等着接收結果就能夠了。

       事件通常由命令處理器引起,能夠改造命令處理器用 DI 注入事件總線,而後在命令處理器中向事件總線發送事件,事件總線就會自動觸發相應的事件處理器。

結語

       完整的流程大概就是:控制器使用注入的服務執行業務流程,業務服務向命令總線發送命令,命令總線觸發處理器處理命令,命令處理器向事件總線發送事件,事件總線觸發事件處理器處理事件,事件處理器在處理事件後向事件總線發送新的事件觸發後續事件處理器繼續處理新的事件(若是須要),直到最後不發送事件的事件處理器完成處理。整個流程完結。在此過程當中總線會自動調用注入的總線消息存儲來持久化命令和事件,至此,一個環環相扣的極簡 DDD+CQRS+EDA+ES 架構搭建完成!

       想要實際體驗的朋友能夠到文章末尾下載項目並運行體驗。啓動調試後訪問 /swagger 而後嘗試體驗調用 api/users 接口。

 

       轉載請完整保留如下內容並在顯眼位置標註,未經受權刪除如下內容進行轉載盜用的,保留追究法律責任的權利!

  本文地址:http://www.javashuo.com/article/p-roogpijp-cr.html

  完整源代碼:Github

  裏面有各類小東西,這只是其中之一,不嫌棄的話能夠Star一下。

相關文章
相關標籤/搜索