CQRS學習——最小單元的Cqrs(CommandEvent)[其一]


【說明:博主採用邊寫邊思考的方式完成這一系列的博客,因此代碼以附件爲準,文中代碼僅爲了說明。】html

結構

在學習和實現CQRS的過程當中,首要參考的項目是這個【http://www.cnblogs.com/yangecnu/p/Introduction-CQRS.html】。因此Dpfb.Cqrs中的總體結構都是參考這個例子來的,在這個基礎之上添加和改進。總的來講,.Cqrs項目的總體結構以下所示:web

主要包含了(命令,事件,通訊,命令處理,事件處理這幾個方面)。具體的角色則以下圖所示:編程

通訊中包含了事件總線和命令總線。因爲查詢入口(QueryEntry)如何處置暫時沒想好,因此先放一個文件夾賣萌。緩存

實現

此時的目標是實現一個最小單元的CQRS(主要是命令和事件部分)。其中命令總線和事件總線的實現比較固定,他們的職責就是爲命令和事件找到對應的處理類,而後依次調用對象的接口方法,從而將通常的直接調用「打斷」,同時提供各類額外操做的注入點。爲了更好的控制他們如何尋找處理類,這裏引入兩個接口ICommandHandlerSearcher以及IEventHandlerSearcher。放在項目的Configuration名稱空間下面。如今,能夠爲這些接口提供一個基礎的實現,供測試使用。如下是接口定義:性能優化

    public interface ICommand { Guid Id { get; set; } } public interface IEvent { Guid Id { get; set; } } public interface IEventHandler<T> where T : IEvent { void Handle(T @event); } public interface ICommandHandler<T> where T: ICommand { void Execute(T command); } public interface ICommandBus { void Send<T>(T command) where T : ICommand; } public interface IEventBus { void Publish<T>(T @event) where T : IEvent; } public interface ICommandHandlerSearcher { ICommandHandler<T> Find<T>() where T : ICommand; } public interface IEventHandlerSearcher { IEnumerable<IEventHandler<T>> Find<T>() where T : IEvent; }
接口定義

以及通訊(Buses)部分的實現:數據結構

public class DpfbEventBus : IEventBus
    {
        void IEventBus.Publish<T>(T @event)
        {
            foreach (var handler in EventHandlerSearcher.Find<T>())
            {
                handler.Handle(@event);
            }
        }

        public IEventHandlerSearcher EventHandlerSearcher { get; set; }
    }

public class DpfbCommandBus : ICommandBus
    {
        void ICommandBus.Send<T>(T command)
        {
            var handler = CommandHandlerSearcher.Find<T>();
            handler.Execute(command);
        }

        public ICommandHandlerSearcher CommandHandlerSearcher { get; set; }
    }
Buses實現

其中,兩個ISearcheres的實現包含了如何尋找Handlers以及到哪裏尋找Handlers,這些具體的內容,.Cqrs這層其實並不關心。處於測試的目的,將這些實現代碼移到Test名稱空間下。並實現本程序集內的查找:併發

 public class TestCommandHandlerSearcher:ICommandHandlerSearcher
    {
        ICommandHandler<T> ICommandHandlerSearcher.Find<T>()
        {
            var assembly = Assembly.GetCallingAssembly();
            var declaredType = typeof (ICommandHandler<T>);
            var handlers = assembly.GetTypes()
                .Where(i => declaredType.IsAssignableFrom(i))
                .Select(i => Activator.CreateInstance(i))
                .ToArray();
            if (handlers.Count() != 1)
                throw new Exception();
            return handlers.First() as ICommandHandler<T>;
        }
    }

 public class TestEventHandlerSearcher : IEventHandlerSearcher
    {
        IEnumerable<IEventHandler<T>> IEventHandlerSearcher.Find<T>()
        {
            var assembly = Assembly.GetCallingAssembly();
            var declaredType = typeof (IEventHandler<T>);
            var handlers = assembly.GetTypes()
                .Where(i => declaredType.IsAssignableFrom(i))
                .Select(i => Activator.CreateInstance(i));
            return handlers.Cast<IEventHandler<T>>();
        }
    }
Searchers實現

測試

接下來添加一些實現代碼,以進行測試:框架

    public class TestEventTwo : IEvent
    {
        Guid IEvent.Id
        {
            get { throw new NotImplementedException(); }
            set { throw new NotImplementedException(); }
        }
    }

    public class TestEventOne : IEvent
    {
        Guid IEvent.Id
        {
            get { throw new NotImplementedException(); }
            set { throw new NotImplementedException(); }
        }
    }

    public class TestEventHandler : IEventHandler<TestEventOne>,
        IEventHandler<TestEventTwo>
    {
        void IEventHandler<TestEventOne>.Handle(TestEventOne @event)
        {
            Console.WriteLine("處理了事件eventOne");
            Configuration.EventBus.Publish(new TestEventTwo());
            Console.WriteLine("引起了事件eventOne");
        }

        void IEventHandler<TestEventTwo>.Handle(TestEventTwo @event)
        {
            Console.WriteLine("處理了事件eventTwo");
        }
    }

    public class TestCommandHandler : ICommandHandler<TestCommand>
    {
        void ICommandHandler<TestCommand>.Execute(TestCommand command)
        {
            Console.WriteLine("獲取並處理消息:" + command.Message);
            var eventOne = new TestEventOne();
            Configuration.EventBus.Publish(eventOne);
        }
    }

    public class TestCommand : ICommand
    {
        Guid ICommand.Id
        {
            get { throw new NotImplementedException(); }
            set { throw new NotImplementedException(); }
        }

        public string Message { get; set; }
    }

    public static class Configuration
    {
        public static ICommandBus CommandBus = new DpfbCommandBus();

        public static IEventBus EventBus = new DpfbEventBus();
    }
測試代碼

其中,Buses做爲單例存在於一個靜態類的字段中。
最後,用一個單元測試運行:異步

[TestMethod]
        public void TestMethod1()
        {
            var command = new TestCommand {Message = "國慶記得回家"};
            Configuration.CommandBus.Send(command);
        }
UnitTest

結果(倒序是正常的,總的來講,事件鏈的執行會是一個深度優先的調用):ide

審計(以及Session)

在上個測試例子中,使用了控制檯輸出來代表各個事件的執行順序。在正常開發過程當中,咱們總不能處處輸出到控制檯,就算輸出了也不見得有用。因此咱們使用另外的方法來實現這一點。博主第一次接觸審計這個概念是在接觸一個名爲ABP的框架的時候【此處應有連接】,裏面包含了不少的信息(用戶身份,性能計數,時間等...),當時很是震驚。因而就把ABP源碼中的一個文件夾扒了下來本身用。ABP實現了Service方法的審計,用的是動態代理(Castle)。而Cqrs自己就留了無數的注入點,因此實現起來更加直觀和方便。另外,博主同時參考了ABP的Session的實現,而後搞出一個土鱉版的Session。雖然比較無恥,可是對於之後ABP的理解應該會有幫助。

博主當初匆匆忙忙的擼了一個CQRS的框架,只是想試水,因此沒有想不少,Auditing部分真的是直接扒下來改了改,如今開始寫博客,有更多的時間思考,因此打算從頭開始實現一個。首先開始抽象,Auditing是一種消息,同時還要考慮它的存放問題,因此定義瞭如下兩個接口(因爲Abudting還包含了其餘方面,如web,全部接口定義移到Dpfb層):

public interface IAuditInfo
    {

    }

 public interface IAuditStorage
    {
        IEnumerable<IAuditInfo> Retrive();
    }
Auditing接口

順便實現一個基於內存的存儲:

 public class MemoryAuditStorage:IAuditStorage
    {
        private List<IAuditInfo> _inMemory = new List<IAuditInfo>();

        IEnumerable<IAuditInfo> IAuditStorage.Retrive()
        {
            return _inMemory;
        }


        void IAuditStorage.Save(IAuditInfo auditInfo)
        {
            _inMemory.Add(auditInfo);
        }
    }
MemoryAuditStorage

接下來實現Cqrs的命令和時間的審計對象。因爲事件的調用鏈是棵樹,這裏在Dpfb層引入一些必須的數據結構(見DaaStructure名稱空間)。如下是針對Cqrs的AuditInfo實現:

public class CommandEventAuditInfo : ExtendedTreeNode<CommandEventAuditInfo>, IAuditInfo
    {
        public DateTime InvokedTime { get; set; }

        /// <summary>
        /// 單位爲毫秒
        /// </summary>
        public int InvokingDuration { get; set; }

        public IDpfbSession DpfbSession { get; set; }

        public Type CommandEventHandlerType { get; set; }

        public Type CommandEventType { get; set; }

        public Type DeclaredHandlerType { get; set; }

        //todo 待擴展
        public object ThreadInfo { get; set; }

        public bool Stopped { get; private set; }

        private Stopwatch _stopwatch = new Stopwatch();

        public void Start()
        {
            _stopwatch.Start();
        }

        public void Stop()
        {
            _stopwatch.Stop();
            InvokingDuration = (int) _stopwatch.ElapsedMilliseconds;
            Stopped = true;
            Current = Parent;
        }

        public static ConcurrentDictionary<int, CommandEventAuditInfo> ConcurrentDic =
            new ConcurrentDictionary<int, CommandEventAuditInfo>();

        public override string ToString()
        {
            return PrintTree();
        }

        public string PrintTree()
        {
            var sb = new StringBuilder();
            var userStr = DpfbSession != null ? DpfbSession.ToString() : "匿名用戶";
            var @abstract = string.Format("命令事件調用分析[{3}]:用戶[{1}]引起了[{0}],總耗時[{2}]毫秒。調用鏈:",
                CommandEventType.Name, userStr, InvokingDuration, InvokedTime);
            sb.Append(@abstract);
            var recursionDepth = 1;
            return PrintTree(this, ref recursionDepth, sb);
        }

        private string PrintTree(CommandEventAuditInfo auditInfo, ref int recursionDepth, StringBuilder sb)
        {
            sb.AppendLine();
            var span = recursionDepth == 0
                ? ""
                : string.Join("", new string[recursionDepth].Select(i => "         ").ToArray());
            sb.Append(span + "|---");
            sb.AppendFormat("[{2}]處理[{0}],耗時[{1}毫秒]", auditInfo.CommandEventType.Name,
                auditInfo.InvokingDuration, auditInfo.CommandEventType.Name);
            if (auditInfo.Children.Any())
                recursionDepth++;
            foreach (var commandEventAuditInfo in auditInfo.Children)
            {
                PrintTree(commandEventAuditInfo, ref recursionDepth, sb);
            }
            return sb.ToString();
        }

        private static CommandEventAuditInfo CreateUnstoppedOnCurrentThread()
        {
            Func<int, CommandEventAuditInfo> creator = k => new CommandEventAuditInfo()
            {
                ThreadInfo = k
            };
            var threadName = Thread.CurrentThread.ManagedThreadId;
            var auditInfo = ConcurrentDic.GetOrAdd(threadName, creator);
            if (auditInfo.Stopped)
            {
                return ConcurrentDic.AddOrUpdate(threadName, creator, (k, nv) => creator((int) nv.ThreadInfo));
            }
            return auditInfo;
        }

        public static CommandEventAuditInfo StartNewForCommand<TCommand>(Type handlerType) where TCommand : ICommand
        {
            var root = CreateUnstoppedOnCurrentThread();
            root.InvokedTime = DateTime.Now;
            root.CommandEventHandlerType = handlerType;
            root.CommandEventType = typeof (TCommand);
            root.DeclaredHandlerType = typeof (ICommandHandler<TCommand>);
            Current = root;
            return root;
        }

        public static CommandEventAuditInfo StartNewForEvent<TEvent>(Type handlerType) where TEvent : IEvent
        {
            var auditInfo = new CommandEventAuditInfo()
            {
                CommandEventType = typeof (TEvent),
                CommandEventHandlerType = handlerType,
                DeclaredHandlerType = typeof (IEventHandler<TEvent>),
                InvokedTime = DateTime.Now
            };
            Current.Children.Add(auditInfo);
            auditInfo.Parent = Current;
            return auditInfo;
        }

        public static CommandEventAuditInfo Root
        {
            get { return CreateUnstoppedOnCurrentThread(); }
        }

        public static CommandEventAuditInfo Current { get; private set; }
    }
AuditInfo4Cqrs

稍做修改以後,從新運行單元測試:

[TestMethod]
        public void TestAuditing()
        {
            var command = new TestCommand {Message = "國慶記得回家"};
            Configuration.CommandBus.Send(command);
            command = new TestCommand {Message = "國慶記得回家"};
            Configuration.CommandBus.Send(command);
            foreach (var auditInfo in Configuration.AuditStorage.Retrive())
            {
                Console.WriteLine(auditInfo.ToString());
            }
        }
TestAuditing

這裏是運行結果[事件調用鏈的關係有誤,請移步第二篇查看]:

 至於Session,此時實現並不能表示其做用,因此只是定義了一個接口。

補丁

至此,一個能夠發揮做用的Cqrs就完成了。同時也遺留下一些須要思考的問題:

【事件的繼承關係】

在博主的目前實現中,是不考慮事件的繼承關係的。

【事件的前後順序】

使用Handler實現方法上的Attribute實現,這個僅僅做爲輔助功能實現。邏輯上的前後順序由事件鏈指定。

【CqrsAuditng的生命週期】

目前博主使用Unity做IoC,基於線程的生命週期管理,會存在併發問題(ASP.NET),須要繼續考慮。

【Searchers的性能優化】

實際上調用鏈在編譯期間就肯定了,因此,可使用ExpressionTree做緩存。

【事件/命令異步執行的支持】

在ASP.NET中,使用基於TPL的一套異步編程框架能夠提升吞吐。這個能夠大體這麼理解:在web系統中,存在兩種角色,請求入口(IIS)【標記爲A】,以及請求處理者(實現代碼)【標記爲B】。咱們先假定整個系統只有一個A。那麼,在改進以前,狀況是「A接到了一個請求告訴B,而後看着B把事情幹完,再把結果告訴請求方」。改進以後的狀況是「A接到了一個請求B,告訴B處理完事情以後給個反饋——A立刻開始處理其餘請求。」顯然,在改進以前A的等待時間很長,改進以後A當了甩手掌櫃(只等報告),等待時間很短。因而乎,A單位時間能的處理量就上去了,吞吐就上去了。然而,對於一次請求而言,時長取決於B,除非B的工做效率有所改進,否則並不會提高性能。

這些是博主的我的理解,並不求多少準確,僅但願能大體描述這麼一個意思。

然而博主作了不少測試,並無反應出這一點。詳細內容,請移步這篇文章:【此處應有連接】

【Audting的配置】

這是ABP作的好的地方,也是工做量最大的地方,配置的優先級等。 

 

此篇完成時,所使用的代碼:【http://pan.baidu.com/s/1o6IeNXK

相關文章
相關標籤/搜索