【說明:博主採用邊寫邊思考的方式完成這一系列的博客,因此代碼以附件爲準,文中代碼僅爲了說明。】html
在學習和實現CQRS的過程當中,首要參考的項目是這個【http://www.cnblogs.com/yangecnu/p/Introduction-CQRS.html】。因此Dpfb.Cqrs中的總體結構都是參考這個例子來的,在這個基礎之上添加和改進。總的來講,.Cqrs項目的總體結構以下所示:web
主要包含了(命令,事件,通訊,命令處理,事件處理這幾個方面)。具體的角色則以下圖所示:編程
通訊中包含了事件總線和命令總線。因爲查詢入口(QueryEntry)如何處置暫時沒想好,因此先放一個文件夾賣萌。緩存
此時的目標是實現一個最小單元的CQRS(主要是命令和事件部分)。其中命令總線和事件總線的實現比較固定,他們的職責就是爲命令和事件找到對應的處理類,而後依次調用對象的接口方法,從而將通常的直接調用「打斷」,同時提供各類額外操做的注入點。爲了更好的控制他們如何尋找處理類,這裏引入兩個接口ICommandHandlerSearcher以及IEventHandlerSearcher。放在項目的Configuration名稱空間下面。如今,能夠爲這些接口提供一個基礎的實現,供測試使用。如下是接口定義:性能優化
public interface ICommand { Guid Id { get; set; } } public interface IEvent { Guid Id { get; set; } } public interface IEventHandler<T> where T : IEvent { void Handle(T @event); } public interface ICommandHandler<T> where T: ICommand { void Execute(T command); } public interface ICommandBus { void Send<T>(T command) where T : ICommand; } public interface IEventBus { void Publish<T>(T @event) where T : IEvent; } public interface ICommandHandlerSearcher { ICommandHandler<T> Find<T>() where T : ICommand; } public interface IEventHandlerSearcher { IEnumerable<IEventHandler<T>> Find<T>() where T : IEvent; }
以及通訊(Buses)部分的實現:數據結構
public class DpfbEventBus : IEventBus { void IEventBus.Publish<T>(T @event) { foreach (var handler in EventHandlerSearcher.Find<T>()) { handler.Handle(@event); } } public IEventHandlerSearcher EventHandlerSearcher { get; set; } } public class DpfbCommandBus : ICommandBus { void ICommandBus.Send<T>(T command) { var handler = CommandHandlerSearcher.Find<T>(); handler.Execute(command); } public ICommandHandlerSearcher CommandHandlerSearcher { get; set; } }
其中,兩個ISearcheres的實現包含了如何尋找Handlers以及到哪裏尋找Handlers,這些具體的內容,.Cqrs這層其實並不關心。處於測試的目的,將這些實現代碼移到Test名稱空間下。並實現本程序集內的查找:併發
public class TestCommandHandlerSearcher:ICommandHandlerSearcher { ICommandHandler<T> ICommandHandlerSearcher.Find<T>() { var assembly = Assembly.GetCallingAssembly(); var declaredType = typeof (ICommandHandler<T>); var handlers = assembly.GetTypes() .Where(i => declaredType.IsAssignableFrom(i)) .Select(i => Activator.CreateInstance(i)) .ToArray(); if (handlers.Count() != 1) throw new Exception(); return handlers.First() as ICommandHandler<T>; } } public class TestEventHandlerSearcher : IEventHandlerSearcher { IEnumerable<IEventHandler<T>> IEventHandlerSearcher.Find<T>() { var assembly = Assembly.GetCallingAssembly(); var declaredType = typeof (IEventHandler<T>); var handlers = assembly.GetTypes() .Where(i => declaredType.IsAssignableFrom(i)) .Select(i => Activator.CreateInstance(i)); return handlers.Cast<IEventHandler<T>>(); } }
接下來添加一些實現代碼,以進行測試:框架
public class TestEventTwo : IEvent { Guid IEvent.Id { get { throw new NotImplementedException(); } set { throw new NotImplementedException(); } } } public class TestEventOne : IEvent { Guid IEvent.Id { get { throw new NotImplementedException(); } set { throw new NotImplementedException(); } } } public class TestEventHandler : IEventHandler<TestEventOne>, IEventHandler<TestEventTwo> { void IEventHandler<TestEventOne>.Handle(TestEventOne @event) { Console.WriteLine("處理了事件eventOne"); Configuration.EventBus.Publish(new TestEventTwo()); Console.WriteLine("引起了事件eventOne"); } void IEventHandler<TestEventTwo>.Handle(TestEventTwo @event) { Console.WriteLine("處理了事件eventTwo"); } } public class TestCommandHandler : ICommandHandler<TestCommand> { void ICommandHandler<TestCommand>.Execute(TestCommand command) { Console.WriteLine("獲取並處理消息:" + command.Message); var eventOne = new TestEventOne(); Configuration.EventBus.Publish(eventOne); } } public class TestCommand : ICommand { Guid ICommand.Id { get { throw new NotImplementedException(); } set { throw new NotImplementedException(); } } public string Message { get; set; } } public static class Configuration { public static ICommandBus CommandBus = new DpfbCommandBus(); public static IEventBus EventBus = new DpfbEventBus(); }
其中,Buses做爲單例存在於一個靜態類的字段中。
最後,用一個單元測試運行:異步
[TestMethod] public void TestMethod1() { var command = new TestCommand {Message = "國慶記得回家"}; Configuration.CommandBus.Send(command); }
結果(倒序是正常的,總的來講,事件鏈的執行會是一個深度優先的調用):ide
在上個測試例子中,使用了控制檯輸出來代表各個事件的執行順序。在正常開發過程當中,咱們總不能處處輸出到控制檯,就算輸出了也不見得有用。因此咱們使用另外的方法來實現這一點。博主第一次接觸審計這個概念是在接觸一個名爲ABP的框架的時候【此處應有連接】,裏面包含了不少的信息(用戶身份,性能計數,時間等...),當時很是震驚。因而就把ABP源碼中的一個文件夾扒了下來本身用。ABP實現了Service方法的審計,用的是動態代理(Castle)。而Cqrs自己就留了無數的注入點,因此實現起來更加直觀和方便。另外,博主同時參考了ABP的Session的實現,而後搞出一個土鱉版的Session。雖然比較無恥,可是對於之後ABP的理解應該會有幫助。
博主當初匆匆忙忙的擼了一個CQRS的框架,只是想試水,因此沒有想不少,Auditing部分真的是直接扒下來改了改,如今開始寫博客,有更多的時間思考,因此打算從頭開始實現一個。首先開始抽象,Auditing是一種消息,同時還要考慮它的存放問題,因此定義瞭如下兩個接口(因爲Abudting還包含了其餘方面,如web,全部接口定義移到Dpfb層):
public interface IAuditInfo { } public interface IAuditStorage { IEnumerable<IAuditInfo> Retrive(); }
順便實現一個基於內存的存儲:
public class MemoryAuditStorage:IAuditStorage { private List<IAuditInfo> _inMemory = new List<IAuditInfo>(); IEnumerable<IAuditInfo> IAuditStorage.Retrive() { return _inMemory; } void IAuditStorage.Save(IAuditInfo auditInfo) { _inMemory.Add(auditInfo); } }
接下來實現Cqrs的命令和時間的審計對象。因爲事件的調用鏈是棵樹,這裏在Dpfb層引入一些必須的數據結構(見DaaStructure名稱空間)。如下是針對Cqrs的AuditInfo實現:
public class CommandEventAuditInfo : ExtendedTreeNode<CommandEventAuditInfo>, IAuditInfo { public DateTime InvokedTime { get; set; } /// <summary> /// 單位爲毫秒 /// </summary> public int InvokingDuration { get; set; } public IDpfbSession DpfbSession { get; set; } public Type CommandEventHandlerType { get; set; } public Type CommandEventType { get; set; } public Type DeclaredHandlerType { get; set; } //todo 待擴展 public object ThreadInfo { get; set; } public bool Stopped { get; private set; } private Stopwatch _stopwatch = new Stopwatch(); public void Start() { _stopwatch.Start(); } public void Stop() { _stopwatch.Stop(); InvokingDuration = (int) _stopwatch.ElapsedMilliseconds; Stopped = true; Current = Parent; } public static ConcurrentDictionary<int, CommandEventAuditInfo> ConcurrentDic = new ConcurrentDictionary<int, CommandEventAuditInfo>(); public override string ToString() { return PrintTree(); } public string PrintTree() { var sb = new StringBuilder(); var userStr = DpfbSession != null ? DpfbSession.ToString() : "匿名用戶"; var @abstract = string.Format("命令事件調用分析[{3}]:用戶[{1}]引起了[{0}],總耗時[{2}]毫秒。調用鏈:", CommandEventType.Name, userStr, InvokingDuration, InvokedTime); sb.Append(@abstract); var recursionDepth = 1; return PrintTree(this, ref recursionDepth, sb); } private string PrintTree(CommandEventAuditInfo auditInfo, ref int recursionDepth, StringBuilder sb) { sb.AppendLine(); var span = recursionDepth == 0 ? "" : string.Join("", new string[recursionDepth].Select(i => " ").ToArray()); sb.Append(span + "|---"); sb.AppendFormat("[{2}]處理[{0}],耗時[{1}毫秒]", auditInfo.CommandEventType.Name, auditInfo.InvokingDuration, auditInfo.CommandEventType.Name); if (auditInfo.Children.Any()) recursionDepth++; foreach (var commandEventAuditInfo in auditInfo.Children) { PrintTree(commandEventAuditInfo, ref recursionDepth, sb); } return sb.ToString(); } private static CommandEventAuditInfo CreateUnstoppedOnCurrentThread() { Func<int, CommandEventAuditInfo> creator = k => new CommandEventAuditInfo() { ThreadInfo = k }; var threadName = Thread.CurrentThread.ManagedThreadId; var auditInfo = ConcurrentDic.GetOrAdd(threadName, creator); if (auditInfo.Stopped) { return ConcurrentDic.AddOrUpdate(threadName, creator, (k, nv) => creator((int) nv.ThreadInfo)); } return auditInfo; } public static CommandEventAuditInfo StartNewForCommand<TCommand>(Type handlerType) where TCommand : ICommand { var root = CreateUnstoppedOnCurrentThread(); root.InvokedTime = DateTime.Now; root.CommandEventHandlerType = handlerType; root.CommandEventType = typeof (TCommand); root.DeclaredHandlerType = typeof (ICommandHandler<TCommand>); Current = root; return root; } public static CommandEventAuditInfo StartNewForEvent<TEvent>(Type handlerType) where TEvent : IEvent { var auditInfo = new CommandEventAuditInfo() { CommandEventType = typeof (TEvent), CommandEventHandlerType = handlerType, DeclaredHandlerType = typeof (IEventHandler<TEvent>), InvokedTime = DateTime.Now }; Current.Children.Add(auditInfo); auditInfo.Parent = Current; return auditInfo; } public static CommandEventAuditInfo Root { get { return CreateUnstoppedOnCurrentThread(); } } public static CommandEventAuditInfo Current { get; private set; } }
稍做修改以後,從新運行單元測試:
[TestMethod] public void TestAuditing() { var command = new TestCommand {Message = "國慶記得回家"}; Configuration.CommandBus.Send(command); command = new TestCommand {Message = "國慶記得回家"}; Configuration.CommandBus.Send(command); foreach (var auditInfo in Configuration.AuditStorage.Retrive()) { Console.WriteLine(auditInfo.ToString()); } }
這裏是運行結果[事件調用鏈的關係有誤,請移步第二篇查看]:
至於Session,此時實現並不能表示其做用,因此只是定義了一個接口。
至此,一個能夠發揮做用的Cqrs就完成了。同時也遺留下一些須要思考的問題:
在博主的目前實現中,是不考慮事件的繼承關係的。
使用Handler實現方法上的Attribute實現,這個僅僅做爲輔助功能實現。邏輯上的前後順序由事件鏈指定。
目前博主使用Unity做IoC,基於線程的生命週期管理,會存在併發問題(ASP.NET),須要繼續考慮。
實際上調用鏈在編譯期間就肯定了,因此,可使用ExpressionTree做緩存。
在ASP.NET中,使用基於TPL的一套異步編程框架能夠提升吞吐。這個能夠大體這麼理解:在web系統中,存在兩種角色,請求入口(IIS)【標記爲A】,以及請求處理者(實現代碼)【標記爲B】。咱們先假定整個系統只有一個A。那麼,在改進以前,狀況是「A接到了一個請求告訴B,而後看着B把事情幹完,再把結果告訴請求方」。改進以後的狀況是「A接到了一個請求B,告訴B處理完事情以後給個反饋——A立刻開始處理其餘請求。」顯然,在改進以前A的等待時間很長,改進以後A當了甩手掌櫃(只等報告),等待時間很短。因而乎,A單位時間能的處理量就上去了,吞吐就上去了。然而,對於一次請求而言,時長取決於B,除非B的工做效率有所改進,否則並不會提高性能。
這些是博主的我的理解,並不求多少準確,僅但願能大體描述這麼一個意思。
然而博主作了不少測試,並無反應出這一點。詳細內容,請移步這篇文章:【此處應有連接】
這是ABP作的好的地方,也是工做量最大的地方,配置的優先級等。
此篇完成時,所使用的代碼:【http://pan.baidu.com/s/1o6IeNXK】