使用EventNext實現基於事件驅動的業務處理

事件驅動模型相信對你們來講並不陌生,由於這是一套很是高效的邏輯處理模型,經過事件來驅動接下來須要完成的工做,而不像傳統同步模型等待任務完成後再繼續!雖然事件驅動有着這樣的好處,但在傳統設計上基於消息回調的處理方式在業務處理中相對比較麻煩總體設計成本也比較高,因此落地也不容易。EventNext是一個事件驅動的應用框架,它的事件驅動支持接口調用,在一系列的業務接口調用過程當中經過事件驅動調用來完成;簡單來講組件驅動的接口行爲是由上一接口行爲完成而觸發執行,接下來介紹詳細介紹一下EventNext和使用。git

NextQueue

EventNext組件有一個核心的事件驅動隊列NextQueue,NextQueue和傳統的線程隊列有着很大的區別;傳統隊列都是線程不停的執行消息,下一個消息都是線程等待上一個消息完成後再繼續。但NextQueue的設計則不是,它的全部消息都基於上一個消息完成來驅動(無論上一個消息的邏輯是同步仍是異步)。實際狀況是NextQueue觸發任務的消息是啓用線程工做外,後面的消息都是基於上一個消息回調執行;NextQueue上的消息執行線程是不肯定性也不須要等待,雖然隊列裏的消息執行線程不是惟一的,但執行順序是一致的這也是NextQueue所帶來的好處,在有序的狀況下確保線程的利用率更高。github

組件使用

在使用組前須要引用組件,Nuget安裝以下api

Install-Package EventNext

經過組件制定的業務必須以接口的方式來描述,而業務的調用也是經過接口的方式進行;雖然組件支持以消息回調的方式便不建議這樣作,畢竟面向業務接口有着更好的易用性和可維護性。爲了確保業務接口方式 的行爲知足事件驅動隊列的要求 ,全部業務行爲方法必須以Task做爲返回值;非Task返回值的行爲方法都不能被組件註冊和調用。安全

接口定義和實現

接口的定義有必定的規則,除了方法返回值是Task外,也不支持同一名稱的函數進行重載,若是有須要可使用特定的Attribute來標記對應的名稱(out類型參數不被支持)。如下是一個簡單的接口定義:多線程

 1     public interface IUserService
 2     {
 3 
 4         Task<int> Income(int value);
 5 
 6         Task<int> Payout(int value);
 7 
 8         Task<int> Amount();
 9 
10     }

業務實現:併發

 1     [Service(typeof(IUserService))]
 2     public class UserService :  IUserService
 3     {
 4         private int mAmount;
 5 
 6         public Task<int> Amount()
 7         {
 8             return Task.FromResult(mAmount);
 9         }
10 
11         public Task<int> Income(int value)
12         {
13             mAmount += value;
14             return Task.FromResult(mAmount);
15         }
16 
17         public Task<int> Payout(int value)
18         {
19             mAmount -= value;
20             return Task.FromResult(mAmount);
21         }
22 
23     }

須要經過ServiceAttribute來描這個類提供那些事件驅動的接口行爲。框架

使用

組件經過一個EventCenter的對象來進行邏輯調用,建立該對象並註冊相應業務功能的程序集便可:異步

EventCenter eventCenter = new EventCenter();
eventCenter.Register(typeof(Program).Assembly);

定義EventCenter加載邏輯後就能夠建立代理接口調用async

var service=EventCenter.Create<IUserService>();
await server.Payout(10);
await server.Income(10);

事件驅動隊列分配

組件針對不一樣狀況的須要,能夠給接口實例或方法定義不一樣的事件隊列配置,主要爲如下幾種狀況函數

默認

由組件內部隊列組進行負載狀況進行配置,這種分配方式會致使同一接口的方法有可能分配在不一樣的隊列上;在默認分配下接口實例的方法會存在多線程中同時的運行,所以這種模式的應用並非線程安全。

Actor

Actor相信你們也很熟悉,一種高性能一致性的調度模型;組件支持這種模型的接口實例建立,只須要在建立接口代理的時候指定Actor名稱便可

henry = EventCenter.Create<IUserService>("henry");

當指定Actor名稱後,這個接口的全部方法調用都會一致性到對應實例的隊列中,即全部功能方法線程調用的惟一性;在接口調用返回的時候也會再次切入到其餘事件驅動隊列,確保Actor內部的工做隊列不受響後的應邏輯影響;當使用這種方式時整個Actor實例都是線程安全的。

ThreadPool

這種配置只適用於接口方法,描述方法不管什麼狀況都從線程池中執行相關代碼,此行爲的方法非線程安全

1 [ThreadInvoke(ThreadType.ThreadPool)]
2 public Task<int> ThreadInvoke()
3 {
4             mCount++;
5             return mCount.ToTask();
6 }

SingleQueue

這種配置只適用於接口方法,用於描述方法無論那個實例都一致性到一個隊列中,此行爲的方法內線程安全,不保證對應實例是線程安全.

 1         [ThreadInvoke(ThreadType.SingleQueue)]
 2         public Task<int> GetID([ThreadUniqueID]string name)
 3         {
 4             if (!mValues.TryGetValue(name, out int value))
 5             {
 6                 value = 1;
 7             }
 8             else
 9             {
10                 value++;
11             }
12             mValues[name] = value;
13             return value.ToTask();
14         }

在這配置下還能夠再細分,如上面的[ThreadUniqueID]對不一樣參數作一致性對列,這個時候name的不一樣值會一致性到不一樣的事件隊列中。

Actor性能對比

組件默認集成了Actor模型,能夠經過它實現高併發無鎖業務集成,EventNext最大的特色是以接口的方式集成應用,相對於akka.net基於消息接收的模式來講有着明顯的應用優點。在性能上EventNext基於接口的ask機制也比akka.net基於消息receive的ask機制要高,如下是一個簡單的對比測試

akak.net

 1  public class UserActor : ReceiveActor
 2     {
 3         public UserActor()
 4         {
 5             Receive<Income>(Income =>
 6             {
 7                 mAmount += Income.Memory;
 8                 this.Sender.Tell(mAmount);
 9             });
10             Receive<Payout>(Outlay =>
11             {
12                 mAmount -= Outlay.Memory;
13                 this.Sender.Tell(mAmount);
14             });
15             Receive<Get>(Outlay =>
16             {
17                 this.Sender.Tell(mAmount);
18             });
19         }
20         private decimal mAmount;
21     }
22     //invoke
23     Income income = new Income { Memory = i };
24     var result = await nbActor.Ask<decimal>(income);
25     Payout payout = new Payout { Memory = i };
26     var result = await nbActor.Ask<decimal>(payout);

Event Next

 1     [Service(typeof(IUserService))]
 2     public class UserService : IUserService
 3     {
 4         private int mAmount;   
 5 
 6         public Task<int> Amount()
 7         {
 8             return Task.FromResult(mAmount);
 9         }
10 
11         public Task<int> Income(int value)
12         {
13             mAmount += value;
14             return Task.FromResult(mAmount);
15         }
16 
17         public Task<int> Payout(int value)
18         {
19             mAmount -= value;
20             return Task.FromResult(mAmount);
21         }
22     }
23     //invoke
24     var result = await nb.Income(i);
25     var result = await nb.Payout(i);

詳細測試代碼https://github.com/IKende/EventNext/tree/master/samples/EventNext_AkkaNet 在默認配置下不一樣併發下的測試結果

 

Event Sourcing

因爲事件驅動提倡的業務處理都是異步,這樣就帶來一個業務事務性的問題,如何確保不一樣接口方法業務處理一致性就比較關鍵了。因爲不一樣的邏輯在不一樣線程中異步進行,因此相對比較好解決的就是在業務處理時引入Event Sourcing.如下就簡單介紹一下組件這方面的應用,就不詳細介紹了。畢竟 Event Sourcing設計和業務還有着一些關係

 1         public async Task<long> Income(int amount)
 2         {
 3             await EventCenter.WriteEvent(this, null, null, new { History = user.Amount, Change = amount, Value = user.Amount + amount });
 4             user.Amount += amount;
 5             return user.Amount;
 6         }
 7 
 8         public async Task<long> Pay(int amount)
 9         {
10             await EventCenter.WriteEvent(this, null, null, new { History = user.Amount, Change = -amount, Value = user.Amount - amount });
11             user.Amount -= amount;
12             return user.Amount;
13         }

組件提供事件信息的讀寫接口IEventLogHandler能夠經過實現這個接口擴展本身的事件源處理。

使用注意事項

適應async/await

其實整個事件隊列都是使用async/await,經過它大大簡化了消息和回調函數間不一樣數據狀態整合的難度。.Net也現有所異步API都支持async/wait

異步化設計你的邏輯

在實現接口邏輯的狀況儘量使和異步邏輯方法,在邏輯實施過程當中禁用Task.Wait或一些線程相關Wait的方法,特別不帶超時的Wait由於這種操做極容易致使事件驅動隊列邏輯被掛起,致使隊列沒法正常工做;更糟糕的狀況可能引發事件隊列假死的狀況。

傳統異步API

因爲各類緣由,可能還存在舊的異步API不支持async/wait,出現這狀況能夠經過TaskCompletionSource來擴展已經有的異步方法支持async/wait

項目地址

https://github.com/IKende/EventNext

相關文章
相關標籤/搜索