在 Abp 框架內部實現了工做單元,在這裏講解一下,什麼是工做單元?html
Unit Of Work(工做單元)模式用來維護一個由已經被業務事物修改(增長、刪除或更新)的業務對象組成的列表。Unit Of Work模式負責協調這些修改的持久化工做以及全部標記的併發問題。在數據訪問層中採用Unit Of Work模式帶來的好處是可以確保數據完整性。若是在持久化一系列業務對象(他們屬於同一個事物)的過程當中出現問題,那麼應該將全部的修改回滾,以確保數據始終處於有效狀態。數據庫
而在 Abp 的內部則是結合 Castle 的 Dynamic Proxy 攔截 UnitOfwork Attribute 來進行動態代理注入,實現了當執行標註了 [UnitOfwork]
方法時可以經過 UnitOfworkManager
來進行事務控制。併發
其大概流程以下:app
首先咱們來看一下 Abp 內部是何時注入 UOW 相關的代碼的,翻閱源碼,在 AbpBootstrapper
內部咱們就能夠看到 Abp 做者爲 UOW 寫了一個攔截器,而且在 Abp 框架初始化的時候就經過 AddInterceptorRegistrars()
方法來監聽 IocManager
的組件註冊事件,當觸發事件的時候就來判斷是否知足條件,若是知足則將攔截器與該類型進行一個綁定。框架
public class AbpBootstrapper : IDisposable { private AbpBootstrapper([NotNull] Type startupModule, [CanBeNull] Action<AbpBootstrapperOptions> optionsAction = null) { // 其餘代碼 if (!options.DisableAllInterceptors) { // 添加攔截器 AddInterceptorRegistrars(); } } private void AddInterceptorRegistrars() { ValidationInterceptorRegistrar.Initialize(IocManager); AuditingInterceptorRegistrar.Initialize(IocManager); EntityHistoryInterceptorRegistrar.Initialize(IocManager); UnitOfWorkRegistrar.Initialize(IocManager); AuthorizationInterceptorRegistrar.Initialize(IocManager); } }
internal static class UnitOfWorkRegistrar { public static void Initialize(IIocManager iocManager) { // 監聽組件註冊事件 iocManager.IocContainer.Kernel.ComponentRegistered += (key, handler) => { var implementationType = handler.ComponentModel.Implementation.GetTypeInfo(); // 按 UOW 特性註冊 HandleTypesWithUnitOfWorkAttribute(implementationType, handler); // 按規約註冊 HandleConventionalUnitOfWorkTypes(iocManager, implementationType, handler); }; } private static void HandleTypesWithUnitOfWorkAttribute(TypeInfo implementationType, IHandler handler) { if (IsUnitOfWorkType(implementationType) || AnyMethodHasUnitOfWork(implementationType)) { handler.ComponentModel.Interceptors.Add(new InterceptorReference(typeof(UnitOfWorkInterceptor))); } } private static void HandleConventionalUnitOfWorkTypes(IIocManager iocManager, TypeInfo implementationType, IHandler handler) { if (!iocManager.IsRegistered<IUnitOfWorkDefaultOptions>()) { return; } var uowOptions = iocManager.Resolve<IUnitOfWorkDefaultOptions>(); if (uowOptions.IsConventionalUowClass(implementationType.AsType())) { handler.ComponentModel.Interceptors.Add(new InterceptorReference(typeof(UnitOfWorkInterceptor))); } } private static bool IsUnitOfWorkType(TypeInfo implementationType) { return UnitOfWorkHelper.HasUnitOfWorkAttribute(implementationType); } private static bool AnyMethodHasUnitOfWork(TypeInfo implementationType) { return implementationType .GetMethods(BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic) .Any(UnitOfWorkHelper.HasUnitOfWorkAttribute); } }
能夠看到在這個 Registrar 裏面他擁有兩種註冊方式,第一種很簡單,就是判斷註冊的組件類型是否擁有 UOW 標籤,第二種則是經過規約來注入攔截器。異步
Abp 默認針對倉儲與應用服務會自動將攔截器掛載到這兩個類型以及他的全部子類的。這裏的 UnitOfWorkDefaultOptionsExtensions.IsConventionalUowClass()
方法就是用來判斷傳入的 Type 是否屬於規約的 Type。async
public static bool IsConventionalUowClass(this IUnitOfWorkDefaultOptions unitOfWorkDefaultOptions, Type type) { return unitOfWorkDefaultOptions.ConventionalUowSelectors.Any(selector => selector(type)); }
又牽扯到了一個 IUnitOfWorkDefaultOptions
,看一下他的默認實現 UnitOfWorkDefaultOptions
就會發現這樣的代碼:ide
public UnitOfWorkDefaultOptions() { _filters = new List<DataFilterConfiguration>(); IsTransactional = true; Scope = TransactionScopeOption.Required; IsTransactionScopeAvailable = true; // 默認類型 ConventionalUowSelectors = new List<Func<Type, bool>> { type => typeof(IRepository).IsAssignableFrom(type) || typeof(IApplicationService).IsAssignableFrom(type) }; }
在上一步咱們經過兩種注入方式將攔截器注入到須要應用工做單元特性的類型裏面,那麼咱們程序在執行的時候就會使用 Dyncmic Proxy 來攔截包裹這些方法。post
下面咱們就來看一下剛剛注入的攔截器:ui
internal class UnitOfWorkInterceptor : IInterceptor { // ... // 其餘代碼 public void Intercept(IInvocation invocation) { MethodInfo method; try { method = invocation.MethodInvocationTarget; } catch { method = invocation.GetConcreteMethod(); } // 判斷當前進入的方法是否帶有 UnitOfWork 特性 var unitOfWorkAttr = _unitOfWorkOptions.GetUnitOfWorkAttributeOrNull(method); if (unitOfWorkAttr == null || unitOfWorkAttr.IsDisabled) { // 沒有則直接執行該方法 invocation.Proceed(); return; } PerformUow(invocation, unitOfWorkAttr.CreateOptions()); } // ... // 其餘代碼 }
攔截器內部方法很簡單,若是是 UOW 方法則執行 PerformUow()
便可,在該方法內部則對方法類型進行了不一樣的判斷,同步與異步的處理方法是不同的。
// ... // 其餘代碼 private void PerformUow(IInvocation invocation, UnitOfWorkOptions options) { // 判斷方法是同步仍是異步方法,不一樣則執行不一樣的處理操做 if (invocation.Method.IsAsync()) { PerformAsyncUow(invocation, options); } else { PerformSyncUow(invocation, options); } } // ... // 其餘代碼
那麼咱們就先來看一下同步方法:
// ... // 其餘代碼 private void PerformSyncUow(IInvocation invocation, UnitOfWorkOptions options) { using (var uow = _unitOfWorkManager.Begin(options)) { // 繼續執行 invocation.Proceed(); uow.Complete(); } } // ... // 其餘代碼
同步方法針對 UOW 的操做十分簡單,直接使用 UnitOfWorkManager.Begin()
方法開啓一個事務,而後在內部執行原有方法的代碼,執行完成以後調用 Complete()
完成這次調用。
假如我擁有兩個應用服務類,他們都擁有 UnitOfWork
特性,而後我再一個 A 方法調用他們兩個 B 類的 Run()
方法,而 B類的內部也調用了C 的 Run()
方法,大致以下:
public class A { private readonly B B; public A(B b) { B = b; } public void TestMethod() { B.Run(); } } internal class B { private readonly C C; public B(C c) { C = c; } [UnitOfWork] public void Run() { // 數據庫操做 C.Run(); Console.WriteLine("B 的 Run 方法被調用."); } } internal class C { [UnitOfWork] public void Run() { Console.WriteLine("C 的 Run 方法被調用."); } }
而後在攔截器內部的執行過程就相似於下面這種:
internal class UnitOfWorkInterceptor { public void TestMethod() { using (var uow = _unitOfWorkManager.Begin(options)) { using(var uow2 = _unitOfWorkManager.Begin(options)) { // C 方法的代碼 Console.WriteLine("C 的 Run 方法被調用."); uow2.Complete(); } // B 方法的代碼 Console.WriteLine("B 的 Run 方法被調用."); uow.Complete(); } } }
兩個工做單元之間的調用會被嵌套在一個 using 語句塊之中,一旦任何代碼拋出了異常,都會致使最外層的 uow.Complete()
不會被執行,而 Complete()
方法沒有執行,則會致使 uow 對象被釋放的時候,uow.Dispose()
內部檢測到 Complete()
沒有被調用,Abp 框架也會本身拋出異常。
因此 Abp 巧妙結合 Castle Dynamic 實現了 UOW 模式。
下面咱們繼續看一下他是如何處理異步 UOW 方法的。
private void PerformAsyncUow(IInvocation invocation, UnitOfWorkOptions options) { var uow = _unitOfWorkManager.Begin(options); try { invocation.Proceed(); } catch { uow.Dispose(); throw; } // 若是是無返回值的異步方法 if (invocation.Method.ReturnType == typeof(Task)) { invocation.ReturnValue = InternalAsyncHelper.AwaitTaskWithPostActionAndFinally( (Task) invocation.ReturnValue, async () => await uow.CompleteAsync(), exception => uow.Dispose() ); } // 有返回值的異步方法處理 else { invocation.ReturnValue = InternalAsyncHelper.CallAwaitTaskWithPostActionAndFinallyAndGetResult( invocation.Method.ReturnType.GenericTypeArguments[0], invocation.ReturnValue, async () => await uow.CompleteAsync(), exception => uow.Dispose() ); } }
相比而言,針對攔截到的異步方法處理起來更加複雜一點,可是整體思路仍然是同樣的,將這些工做單元的方法一層層地嵌套起來,依次執行就是核心。而在上面代碼裏面,同樣的首先使用 UnitOfManager.Begin()
得到了一個新的工做單元以後,繼續執行原有的操做,下面則主要是經過內部的 InternalAsyncHelper
封裝的兩個輔助方法來確保等待原有任務執行完成以後,再執行 CompleteAsync()
方法。
咱們能夠來看一下這個內部類的實現:
// 異步無返回值處理 public static async Task AwaitTaskWithPostActionAndFinally(Task actualReturnValue, Func<Task> postAction, Action<Exception> finalAction) { Exception exception = null; try { // 等待原有任務執行完成 await actualReturnValue; // 執行 CompleteAsync() 表示本工做單元已經順利執行 await postAction(); } // 捕獲異常 catch (Exception ex) { exception = ex; throw; } finally { // 不管是否拋出異常,都調用以前傳入的 uow.Dispose() 方法 finalAction(exception); } } // 原理基本同上,只是多了一個返回值 public static async Task<T> AwaitTaskWithPostActionAndFinallyAndGetResult<T>(Task<T> actualReturnValue, Func<Task> postAction, Action<Exception> finalAction) { Exception exception = null; try { var result = await actualReturnValue; await postAction(); return result; } catch (Exception ex) { exception = ex; throw; } finally { finalAction(exception); } } // 異步有返回值處理 public static object CallAwaitTaskWithPostActionAndFinallyAndGetResult(Type taskReturnType, object actualReturnValue, Func<Task> action, Action<Exception> finalAction) { // 這裏經過反射獲取到 AwaitTaskWithPostActionAndFinallyAndGetResult 方法,並調用。 return typeof (InternalAsyncHelper) .GetMethod("AwaitTaskWithPostActionAndFinallyAndGetResult", BindingFlags.Public | BindingFlags.Static) .MakeGenericMethod(taskReturnType) .Invoke(null, new object[] { actualReturnValue, action, finalAction }); }
並不複雜,以上便是攔截器所作的操做。
經過上文咱們能夠看到一個工做單元是經過 IUnitOfWorkManager.Begin()
拿到的,那 IUnitOfWorkManager
又是個什麼東西呢?
根據字面意思咱們大概知道應該相似於管理 UOW 的東西,它其實只有兩個做用。第一,獲取當前處於激活狀態的工做單元,什麼叫激活狀態咱們後面再講。第二個做用就是咱們以前看到的,能夠經過 Begin()
方法來建立一個新的工做單元。
IUnitOfWorkManager
在 Abp 框架初始化的時候就被注入了,其默認實現爲 UnitOfWorkManager
,其核心方法就是 Begin()
方法。
public IUnitOfWorkCompleteHandle Begin(UnitOfWorkOptions options) { // 若是沒有傳入 UOW 參數,則填充一個默認的參數 options.FillDefaultsForNonProvidedOptions(_defaultOptions); // 獲取當前的外部工做單元 var outerUow = _currentUnitOfWorkProvider.Current; // 若是已經存在有外部工做單元,則直接構建一個內部工做單元 if (options.Scope == TransactionScopeOption.Required && outerUow != null) { return new InnerUnitOfWorkCompleteHandle(); } // 不存在外部工做單元,則從 IOC 容器當中獲取一個新的出來 var uow = _iocResolver.Resolve<IUnitOfWork>(); // 綁定外部工做單元的事件 uow.Completed += (sender, args) => { _currentUnitOfWorkProvider.Current = null; }; uow.Failed += (sender, args) => { _currentUnitOfWorkProvider.Current = null; }; uow.Disposed += (sender, args) => { _iocResolver.Release(uow); }; // 設置過濾器 if (outerUow != null) { options.FillOuterUowFiltersForNonProvidedOptions(outerUow.Filters.ToList()); } uow.Begin(options); // 綁定租戶 ID if (outerUow != null) { uow.SetTenantId(outerUow.GetTenantId(), false); } // 設置當前的外部工做單元爲剛剛初始化的工做單元 _currentUnitOfWorkProvider.Current = uow; return uow; }
能夠看到 Begin()
方法返回的是一個類型爲 IUnitOfWorkCompleteHandle
的東西,轉到其定義:
/// <summary> /// Used to complete a unit of work. /// This interface can not be injected or directly used. /// Use <see cref="IUnitOfWorkManager"/> instead. /// </summary> public interface IUnitOfWorkCompleteHandle : IDisposable { /// <summary> /// Completes this unit of work. /// It saves all changes and commit transaction if exists. /// </summary> void Complete(); /// <summary> /// Completes this unit of work. /// It saves all changes and commit transaction if exists. /// </summary> Task CompleteAsync(); }
他只有兩個方法,都是標識 UOW 處於已經完成的狀態。
在方法上面右鍵查看其實現能夠看到有這樣一種依賴關係:
能夠看到 IUnitOfWorkCompleteHandle
有兩個實現,一個是 InnerUnitOfWorkCompleteHandle
還有一個則是 IUnitOfWork
接口。
首先看一下 InnerUnitOfWorkCompleteHandle
:
internal class InnerUnitOfWorkCompleteHandle : IUnitOfWorkCompleteHandle { public const string DidNotCallCompleteMethodExceptionMessage = "Did not call Complete method of a unit of work."; private volatile bool _isCompleteCalled; private volatile bool _isDisposed; public void Complete() { _isCompleteCalled = true; } public Task CompleteAsync() { _isCompleteCalled = true; return Task.FromResult(0); } public void Dispose() { if (_isDisposed) { return; } _isDisposed = true; if (!_isCompleteCalled) { if (HasException()) { return; } throw new AbpException(DidNotCallCompleteMethodExceptionMessage); } } private static bool HasException() { try { return Marshal.GetExceptionCode() != 0; } catch (Exception) { return false; } } }
代碼很簡單,調用 Complete()/CompleteAsync()
會將 _isCompleteCalled 置爲 true,而後在 Dispose()
方法內會進行檢測,爲 faslse 的話直接拋出異常。能夠看到在 InnerUnitOfWorkCompleteHandle
內部並不會真正地調用 DbContext.SaveChanges()
進行數據保存。
那麼誰纔是真正進行數據庫操做的工做單元呢?
答案就是以前在 IUnitOfWorkManager.Begin()
裏面,能夠看到在建立 UOW 對象的時候,他在內部進行了一個判斷,若是不存在外部工做單元的狀況下才會建立 InnerUnitOfWorkCompleteHandle
對象,不然是解析的一個 IUnitOfWork
對象。
也就是說你能夠想象有如下代碼:
public void TestUowMethod() { using(var outerUOW = Manager.Begin()) // 這裏返回的是 IOC 解析出的 IUnitOfWork { OperationOuter(); using(var innerUOW1 = Manager.Begin()) // 內部 UOW { Operation1(); using(var innerUOW2 = Manager.Begin()) // 內部 UOW { Operation2(); Complete(); } Complete(); } Complete(); } }
當代碼執行的時候,如同俄羅斯套娃,從內部依次到外部執行,內部工做單元僅會在調用 Complete 方法的時候將 completed 標記爲 true,但一旦操做拋出異常,Complete()
沒法獲得執行,則會直接拋出異常,中斷外層代碼執行。
在 ABP 內部針對 EF Core 框架實現了一套 UOW,其繼承自 UnitOfWorkBase
,而在 UnitOfWorkBase
內部有部分針對接口 IActiveUnitOfWork
的實現,同時因爲 IUnifOfWork
也實現了 IUnitOfWorkCompleteHandle
接口,因此在 Begin()
方法處可以向上轉型。
根據上圖能夠知道 Abp 默認實現了一個 UnitOfWorkBase
做爲工做單元的抽象基類,他主要的屬性就是 Id 與 Outer 屬性。
public abstract class UnitOfWorkBase : IUnitOfWork { public string Id { get; } [DoNotWire] public IUnitOfWork Outer { get; set; } }
這裏的 Id 是使用的 Guid 生成的,用於標識每一個工做單元。
而 Outer 則是當前 UOW 對象的引用對象。
這裏重點說一下 Outer 是哪兒來的,Outer 他的值是在以前的 UnitOfWorkManager.Begin()
裏面的 _currentUnitOfWorkProvider.Current = uow;
進行設置的,_currentUnitOfWorkProvider
的實如今 AsyncLocalCurrentUnitOfWorkProvider
內部,其做用是維護一個 UOW 鏈,確保當前的工做單元始終是最新的,這裏的代碼本來是使用 CallContext
實現的,如今已經換爲 AsyncLocal<T>
了。
public class AsyncLocalCurrentUnitOfWorkProvider : ICurrentUnitOfWorkProvider, ITransientDependency { /// <inheritdoc /> [DoNotWire] public IUnitOfWork Current { get { return GetCurrentUow(); } set { SetCurrentUow(value); } } public ILogger Logger { get; set; } private static readonly AsyncLocal<LocalUowWrapper> AsyncLocalUow = new AsyncLocal<LocalUowWrapper>(); public AsyncLocalCurrentUnitOfWorkProvider() { Logger = NullLogger.Instance; } private static IUnitOfWork GetCurrentUow() { var uow = AsyncLocalUow.Value?.UnitOfWork; if (uow == null) { return null; } if (uow.IsDisposed) { AsyncLocalUow.Value = null; return null; } return uow; } private static void SetCurrentUow(IUnitOfWork value) { lock (AsyncLocalUow) { if (value == null) { if (AsyncLocalUow.Value == null) { return; } if (AsyncLocalUow.Value.UnitOfWork?.Outer == null) { AsyncLocalUow.Value.UnitOfWork = null; AsyncLocalUow.Value = null; return; } AsyncLocalUow.Value.UnitOfWork = AsyncLocalUow.Value.UnitOfWork.Outer; } else { if (AsyncLocalUow.Value?.UnitOfWork == null) { if (AsyncLocalUow.Value != null) { AsyncLocalUow.Value.UnitOfWork = value; } AsyncLocalUow.Value = new LocalUowWrapper(value); return; } value.Outer = AsyncLocalUow.Value.UnitOfWork; AsyncLocalUow.Value.UnitOfWork = value; } } } private class LocalUowWrapper { public IUnitOfWork UnitOfWork { get; set; } public LocalUowWrapper(IUnitOfWork unitOfWork) { UnitOfWork = unitOfWork; } } }
繼續往下看,在 UnitOfWorkBase
的裏面也是有個 Complete()
與 CompleteAsync()
方法的。
protected abstract void CompleteUow(); /// <inheritdoc/> public void Complete() { // 判斷是否重複完成 PreventMultipleComplete(); try { CompleteUow(); _succeed = true; OnCompleted(); } catch (Exception ex) { _exception = ex; throw; } }
這裏的 CompleteUow()
仍然只是一個抽象方法,具體的實如今具體的訪問層裏面。
Abp 針對 EF Core 的 UOW 實現是 EfCoreUnitOfWork
,代碼以下:
protected override void BeginUow() { if (Options.IsTransactional == true) { _transactionStrategy.InitOptions(Options); } } public override void SaveChanges() { foreach (var dbContext in GetAllActiveDbContexts()) { SaveChangesInDbContext(dbContext); } } public override async Task SaveChangesAsync() { // 遍歷全部激活的 DbContext foreach (var dbContext in GetAllActiveDbContexts()) { await SaveChangesInDbContextAsync(dbContext); } } protected override void CompleteUow() { SaveChanges(); CommitTransaction(); } protected override async Task CompleteUowAsync() { await SaveChangesAsync(); CommitTransaction(); } private void CommitTransaction() { if (Options.IsTransactional == true) { _transactionStrategy.Commit(); } } public IReadOnlyList<DbContext> GetAllActiveDbContexts() { return ActiveDbContexts.Values.ToImmutableList(); }
根本就是遍歷 DbContext
調用其 SaveChanges()
來提交全部數據庫更改。
餘下更加詳細的東西會放在 《7、倉儲與 Entity Framework Core》 當中說明的。