在某些時候咱們可能會須要執行後臺任務,或者是執行一些週期性的任務。好比說可能每隔 1 個小時要清除某個臨時文件夾內的數據,可能用戶會要針對某一個用戶羣來羣發一組短信。前面這些就是典型的應用場景,在 Abp 框架裏面爲咱們準備了後臺做業和後臺工做者來幫助咱們解決這個問題。html
後臺做業與後臺工做者的區別是,前者主要用於某些耗時較長的任務,而不想阻塞用戶的時候所使用。後者主要用於週期性的執行某些任務,從 「工做者」 的名字能夠看出來,就是一個個工人,並且他們每一個工人都擁有單獨的後臺線程。數據庫
後臺做業網絡
後臺工做者框架
後臺做業與後臺工做者都是經過各自的 Manager(IBackgroundJobManager
/IBackgroundWorkerManager
) 來進行管理的。而這兩個 Manager 分別繼承了 ISingletonDependency
接口,因此在啓動的時候就會自動注入這兩個管理器以便開發人員管理操做。async
這裏值得注意的一點是,IBackgroundJobManager
接口是 IBackgroundWorker
的派生接口,而 IBackgroudWorker
是歸屬於 IBackgroundWorkerManager
進行管理的。ide
因此,你能夠在 AbpKernelModule
裏面看到以下代碼:函數
public sealed class AbpKernelModule : AbpModule { public override void PostInitialize() { // 註冊可能缺乏的組件 RegisterMissingComponents(); // ... 忽略的代碼 // 各類管理器的初始化操做 // 從配置項中讀取,是否啓用了後臺做業功能 if (Configuration.BackgroundJobs.IsJobExecutionEnabled) { var workerManager = IocManager.Resolve<IBackgroundWorkerManager>(); // 開始啓動後臺工做者 workerManager.Start(); // 增長後臺做業管理器 workerManager.Add(IocManager.Resolve<IBackgroundJobManager>()); } } }
能夠看到,後臺做業管理器是做爲一個後臺工做者被添加到了 IBackgroundWorkerManager
當中來執行的。this
Abp 經過後臺工做者管理器來管理後臺做業隊列,因此咱們首先來看一下後臺工做者管理器接口的定義是什麼樣子的。線程
public interface IBackgroundWorkerManager : IRunnable { void Add(IBackgroundWorker worker); }
仍是至關簡潔的,就一個 Add
方法用來添加一個新的後臺工做者對象。只是在這個地方,能夠看到該接口又是集成自 IRunnable
接口,那麼該接口的做用又是什麼呢?日誌
轉到其定義能夠看到,IRunable
接口定義了三個基本的方法:Start()
、Stop()
、WaitStop()
,並且他擁有一個默認實現 RunableBase
,其實就是用來標識一個任務的運行狀態。
public interface IRunnable { // 開始執行任務 void Start(); // 中止執行任務 void Stop(); // 阻塞線程,等待任務執行完成後標識爲中止。 void WaitToStop(); } public abstract class RunnableBase : IRunnable { // 用於標識任務是否運行的布爾值變量 public bool IsRunning { get { return _isRunning; } } private volatile bool _isRunning; // 啓動以後表示任務正在運行 public virtual void Start() { _isRunning = true; } // 中止以後表示任務結束運行 public virtual void Stop() { _isRunning = false; } public virtual void WaitToStop() { } }
到目前爲止整個代碼都仍是比較簡單清晰的,咱們接着看 IBackgroundWorkerManager
的默認實現 BackgroundWorkerManager
類,首先咱們看一下該類擁有哪些屬性與字段。
public class BackgroundWorkerManager : RunnableBase, IBackgroundWorkerManager, ISingletonDependency, IDisposable { private readonly IIocResolver _iocResolver; private readonly List<IBackgroundWorker> _backgroundJobs; public BackgroundWorkerManager(IIocResolver iocResolver) { _iocResolver = iocResolver; _backgroundJobs = new List<IBackgroundWorker>(); } }
在後臺工做者管理器類的內部,默認有一個 List 集合,用於維護全部的後臺工做者對象。那麼其餘的 Start()
等方法確定是基於這個集合進行操做的。
public override void Start() { base.Start(); _backgroundJobs.ForEach(job => job.Start()); } public override void Stop() { _backgroundJobs.ForEach(job => job.Stop()); base.Stop(); } public override void WaitToStop() { _backgroundJobs.ForEach(job => job.WaitToStop()); base.WaitToStop(); }
能夠看到實現仍是比較簡單的,接下來咱們繼續看他的 Add()
方法是如何進行操做的?
public void Add(IBackgroundWorker worker) { _backgroundJobs.Add(worker); if (IsRunning) { worker.Start(); } }
在這裏咱們看到他會針對 IsRunning
進行斷定是否當即啓動加入的後臺工做者對象。而這個 IsRunning
屬性值惟一產生變化的狀況就在於 Start()
方法與 Stop()
方法的調用。
最後確定也有相關的銷燬方法,用於釋放全部注入的後臺工做者對象,並將集合清除。
private bool _isDisposed; public void Dispose() { if (_isDisposed) { return; } _isDisposed = true; // 遍歷集合,經過 Ioc 解析器的 Release 方法釋放對象 _backgroundJobs.ForEach(_iocResolver.Release); // 清空集合 _backgroundJobs.Clear(); }
因此,針對於全部後臺工做者的管理,都是經過 IBackgroundWorkerManager
來進行操做的。
看完了管理器,咱們來看一下 IBackgroundWorker
後臺工做者對象是怎樣的構成。
public interface IBackgroundWorker : IRunnable { }
貌似只是一個空的接口,其做用主要是標識某個類型是否爲後臺工做者,轉到其抽象類實現 BackgroundWorkerBase
,裏面只是注入了一些輔助對象與本地化的一些方法。
public abstract class BackgroundWorkerBase : RunnableBase, IBackgroundWorker { // 配置管理器 public ISettingManager SettingManager { protected get; set; } // 工做單元管理器 public IUnitOfWorkManager UnitOfWorkManager { get { if (_unitOfWorkManager == null) { throw new AbpException("Must set UnitOfWorkManager before use it."); } return _unitOfWorkManager; } set { _unitOfWorkManager = value; } } private IUnitOfWorkManager _unitOfWorkManager; // 得到當前的工做單元 protected IActiveUnitOfWork CurrentUnitOfWork { get { return UnitOfWorkManager.Current; } } // 本地化資源管理器 public ILocalizationManager LocalizationManager { protected get; set; } // 默認的本地化資源的源名稱 protected string LocalizationSourceName { get; set; } protected ILocalizationSource LocalizationSource { get { // 若是沒有配置源名稱,直接拋出異常 if (LocalizationSourceName == null) { throw new AbpException("Must set LocalizationSourceName before, in order to get LocalizationSource"); } if (_localizationSource == null || _localizationSource.Name != LocalizationSourceName) { _localizationSource = LocalizationManager.GetSource(LocalizationSourceName); } return _localizationSource; } } private ILocalizationSource _localizationSource; // 日誌記錄器 public ILogger Logger { protected get; set; } protected BackgroundWorkerBase() { Logger = NullLogger.Instance; LocalizationManager = NullLocalizationManager.Instance; } // ... 其餘模板代碼 }
咱們接着看繼承並實現了 BackgroundWorkerBase
的類型 PeriodicBackgroundWorkerBase
,從字面意思上來看,該類型應該是一個定時後臺工做者基類。
重點在於 Periodic
(定時),從其類型內部的定義能夠看到,該類型使用了一個 AbpTimer
對象來進行週期計時與具體工做任務的觸發。咱們暫時先不看這個 AbpTimer
,僅僅看 PeriodicBackgroundWorkerBase
的內部實現。
public abstract class PeriodicBackgroundWorkerBase : BackgroundWorkerBase { protected readonly AbpTimer Timer; // 注入 AbpTimer protected PeriodicBackgroundWorkerBase(AbpTimer timer) { Timer = timer; // 綁定週期執行的任務,這裏是 DoWork() Timer.Elapsed += Timer_Elapsed; } public override void Start() { base.Start(); Timer.Start(); } public override void Stop() { Timer.Stop(); base.Stop(); } public override void WaitToStop() { Timer.WaitToStop(); base.WaitToStop(); } private void Timer_Elapsed(object sender, System.EventArgs e) { try { DoWork(); } catch (Exception ex) { Logger.Warn(ex.ToString(), ex); } } protected abstract void DoWork(); }
能夠看到,這裏基類綁定了 DoWork()
做爲其定時執行的方法,那麼用戶在使用的時候直接繼承自該基類,而後重寫 DoWork()
方法便可綁定本身的後臺工做者的任務。
在上面的基類咱們看到,基類的 Start()
、Stop()
、WaitTpStop()
方法都是調用的 AbpTimer
所提供的,因此說 AbpTimer
其實也繼承了 RunableBase
基類並實現其具體的啓動與中止操做。
其實 AbpTimer
的核心就是經過 CLR 的 Timer
來實現週期性任務執行的,不過默認的 Timer
類有兩個比較大的問題。
Timer
並不會等待你的任務執行完再執行下一個週期的任務,若是你的某個任務耗時過長,超過了 Timer
定義的週期。那麼 Timer
會開啓一個新的線程執行,這樣的話最後咱們系統的資源會由於線程大量重複建立而被拖垮。Timer
所執行的業務方法已經真正地被結束了。因此 Abp 纔會從新封裝一個 AbpTimer
做爲一個基礎的計時器。第一個問題的解決方法很簡單,就是在執行具體綁定的業務方法以前,經過 Timer.Change()
方法來讓 Timer
臨時失效。等待業務方法執行完成以後,再將 Timer
的週期置爲用戶設定的週期。
// CLR Timer 綁定的回調方法 private void TimerCallBack(object state) { lock (_taskTimer) { if (!_running || _performingTasks) { return; } // 暫時讓 Timer 失效 _taskTimer.Change(Timeout.Infinite, Timeout.Infinite); // 設置執行標識爲 TRUE,表示當前的 AbpTimer 正在執行 _performingTasks = true; } try { // 若是綁定了相應的觸發事件 if (Elapsed != null) { // 執行相應的業務方法,這裏就是最開始綁定的 DoWork() 方法 Elapsed(this, new EventArgs()); } } catch { } finally { lock (_taskTimer) { // 標識業務方法執行完成 _performingTasks = false; if (_running) { // 更改週期爲用戶指定的執行週期,等待下一次觸發 _taskTimer.Change(Period, Timeout.Infinite); } Monitor.Pulse(_taskTimer); } } }
針對於第二個問題,Abp 經過 WaitToStop()
方法會阻塞調用這個 Timer
的線程,而且在 _performingTasks
標識位是 false
的時候釋放。
public override void WaitToStop() { // 鎖定 CLR 的 Timer 對象 lock (_taskTimer) { // 循環檢測 while (_performingTasks) { Monitor.Wait(_taskTimer); } } base.WaitToStop(); }
至於其餘的 Start()
方法就是使用 CLR 的 Timer
更改其執行週期,而 Stop()
就是直接將 Timer
的週期設置爲無限大,使計時器失效。
Abp 後臺工做者的核心就是經過 AbpTimer
來實現週期性任務的執行,用戶只須要繼承自 PeriodicBackgroundWorkerBase
,而後將其添加到 IBackgroundWorkerManager
的集合當中。這樣 Abp 在啓動以後就會遍歷這個工做者集合,而後週期執行這些後臺工做者綁定的方法。
固然若是你繼承了 PeriodicBackgroundWorkerBase
以後,能夠經過設置構造函數的 AbpTimer
來指定本身的執行週期。
後臺工做隊列的管理是經過 IBackgroundJobManager
來處理的,而該接口又繼承自 IBackgroundWorker
,因此一整個後臺做業隊列就是一個後臺工做者,只不過這個工做者有點特殊。
IBackgroundJobManager
接口的定義其實就兩個方法,一個 EnqueueAsync<TJob, TArgs>()
用於將一個後臺做業加入到執行隊列當中。而 DeleteAsync()
方法呢,顧名思義就是從隊列當中移除指定的後臺做業。
首先看一下其默認實現 BackgroundJobManager
,該實現一樣是繼承自 PeriodicBackgroundWorkerBase
而且其默認週期爲 5000 ms。
public class BackgroundJobManager : PeriodicBackgroundWorkerBase, IBackgroundJobManager, ISingletonDependency { // 事件總線 public IEventBus EventBus { get; set; } // 輪訓後臺做業的間隔,默認值爲 5000 毫秒. public static int JobPollPeriod { get; set; } // IOC 解析器 private readonly IIocResolver _iocResolver; // 後臺做業隊列存儲 private readonly IBackgroundJobStore _store; static BackgroundJobManager() { JobPollPeriod = 5000; } public BackgroundJobManager( IIocResolver iocResolver, IBackgroundJobStore store, AbpTimer timer) : base(timer) { _store = store; _iocResolver = iocResolver; EventBus = NullEventBus.Instance; Timer.Period = JobPollPeriod; } }
基礎結構基本上就這個樣子,接下來看一下他的兩個接口方法是如何實現的。
EnqueueAsync<TJob, TArgs>
方法經過傳入指定的後臺做業對象和相應的參數,同時還有任務的優先級。將其經過 IBackgroundJobStore
進行持久化,並返回一個任務的惟一 JobId 以便進行刪除操做。
public async Task<string> EnqueueAsync<TJob, TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null) where TJob : IBackgroundJob<TArgs> { // 經過 JobInfo 包裝任務的基本信息 var jobInfo = new BackgroundJobInfo { JobType = typeof(TJob).AssemblyQualifiedName, JobArgs = args.ToJsonString(), Priority = priority }; // 若是須要延時執行的話,則用當前時間加上延時的時間做爲任務下次運行的時間 if (delay.HasValue) { jobInfo.NextTryTime = Clock.Now.Add(delay.Value); } // 經過 Store 進行持久話存儲 await _store.InsertAsync(jobInfo); // 返回後臺任務的惟一標識 return jobInfo.Id.ToString(); }
至於刪除操做,在 Manager 內部其實也是經過 IBackgroundJobStore
進行實際的刪除操做的。
public async Task<bool> DeleteAsync(string jobId) { // 判斷 jobId 的值是否有效 if (long.TryParse(jobId, out long finalJobId) == false) { throw new ArgumentException($"The jobId '{jobId}' should be a number.", nameof(jobId)); } // 使用 jobId 從 Store 處篩選到 JobInfo 對象的信息 BackgroundJobInfo jobInfo = await _store.GetAsync(finalJobId); if (jobInfo == null) { return false; } // 若是存在有 JobInfo 則使用 Store 進行刪除操做 await _store.DeleteAsync(jobInfo); return true; }
後臺做業管理器實質上是一個週期性執行的後臺工做者,那麼咱們的後臺做業是每 5000 ms 執行一次,那麼他的 DoWork()
方法又在執行什麼操做呢?
protected override void DoWork() { // 從 Store 當中得到等待執行的後臺做業集合 var waitingJobs = AsyncHelper.RunSync(() => _store.GetWaitingJobsAsync(1000)); // 遍歷這些等待執行的後臺任務,而後經過 TryProcessJob 進行執行 foreach (var job in waitingJobs) { TryProcessJob(job); } }
能夠看到每 5 秒鐘咱們的後臺做業管理器就會從 IBackgroundJobStore
當中拿到最大 1000 條的後臺做業信息,而後遍歷這些信息。經過 TryProcessJob(job)
方法來執行後臺做業。
而 TryProcessJob()
方法,本質上就是經過反射構建出一個 IBackgroundJob
對象,而後取得序列化的參數值,經過反射獲得的 MethodInfo
對象來執行咱們的後臺任務。執行完成以後,就會從 Store 當中移除掉執行完成的任務。
針對於在執行過程中所出現的異常,會經過 IEventBus
觸發一個 AbpHandledExceptionData
事件記錄後臺做業執行失敗時的異常信息。而且一旦在執行過程中出現了任何異常的狀況,都會將該任務的 IsAbandoned
字段置爲 true
,當該字段爲 true
時,該任務將再也不回被執行。
PS:就是在
GetWaitingJobsAsync()
方法時,會過濾掉 IsAbandoned 值爲true
的任務。
private void TryProcessJob(BackgroundJobInfo jobInfo) { try { // 任務執行次數自增 1 jobInfo.TryCount++; // 最後一次執行時間設置爲當前時間 jobInfo.LastTryTime = Clock.Now; // 經過反射取得後臺做業的類型 var jobType = Type.GetType(jobInfo.JobType); // 經過 Ioc 解析器獲得一個臨時的後臺做業對象,執行完以後既被釋放 using (var job = _iocResolver.ResolveAsDisposable(jobType)) { try { // 經過反射獲得後臺做業的 Execute 方法 var jobExecuteMethod = job.Object.GetType().GetTypeInfo().GetMethod("Execute"); var argsType = jobExecuteMethod.GetParameters()[0].ParameterType; var argsObj = JsonConvert.DeserializeObject(jobInfo.JobArgs, argsType); // 結合持久話存儲的參數信息,調用 Execute 方法進行後臺做業 jobExecuteMethod.Invoke(job.Object, new[] { argsObj }); // 執行完成以後從 Store 刪除該任務的信息 AsyncHelper.RunSync(() => _store.DeleteAsync(jobInfo)); } catch (Exception ex) { Logger.Warn(ex.Message, ex); // 計算下一次執行的時間,一旦超過 2 天該任務都執行失敗,則返回 null var nextTryTime = jobInfo.CalculateNextTryTime(); if (nextTryTime.HasValue) { jobInfo.NextTryTime = nextTryTime.Value; } else { // 若是爲 null 則說明該任務在 2 天的時間內都沒有執行成功,則放棄繼續執行 jobInfo.IsAbandoned = true; } // 更新 Store 存儲的任務信息 TryUpdate(jobInfo); // 觸發異常事件 EventBus.Trigger( this, new AbpHandledExceptionData( new BackgroundJobException( "A background job execution is failed. See inner exception for details. See BackgroundJob property to get information on the background job.", ex ) { BackgroundJob = jobInfo, JobObject = job.Object } ) ); } } } catch (Exception ex) { Logger.Warn(ex.ToString(), ex); // 表示任務再也不執行 jobInfo.IsAbandoned = true; // 更新 Store TryUpdate(jobInfo); } }
後臺做業的默認接口定義爲 IBackgroundJob<in TArgs>
,他只有一個 Execute(TArgs args)
方法,用於接收指定類型的做業參數,並執行。
通常來講咱們不建議直接經過繼承 IBackgroundJob<in TArgs>
來實現後臺做業,而是繼承自 BackgroundJob<TArgs>
抽象類。該抽象類內部也沒有什麼特別的實現,主要是注入了一些基礎設施,好比說 UOW 與 本地化資源管理器,方便咱們開發使用。
後臺做業自己是具體執行的對象,而 BackgroundJobInfo
則是存儲了後臺做業的 Type 類型和參數,方便在須要執行的時候經過反射的方式執行後臺做業。
從 IBackgroundJobStore
咱們就能夠猜到以 Abp 框架的套路,他確定會有兩種實現,第一種就是基於內存的 InMemoryBackgroundJobStore
。而第二種呢,就是由 Abp.Zero 模塊所提供的基於數據庫的 BackgroundJobStore
。
IBackgroundJobStore
接口所定義的方法基本上就是增刪改查,沒有什麼複雜的。
public interface IBackgroundJobStore { // 經過 JobId 獲取後臺任務信息 Task<BackgroundJobInfo> GetAsync(long jobId); // 插入一個新的後臺任務信息 Task InsertAsync(BackgroundJobInfo jobInfo); /// <summary> /// Gets waiting jobs. It should get jobs based on these: /// Conditions: !IsAbandoned And NextTryTime <= Clock.Now. /// Order by: Priority DESC, TryCount ASC, NextTryTime ASC. /// Maximum result: <paramref name="maxResultCount"/>. /// </summary> /// <param name="maxResultCount">Maximum result count.</param> Task<List<BackgroundJobInfo>> GetWaitingJobsAsync(int maxResultCount); /// <summary> /// Deletes a job. /// </summary> /// <param name="jobInfo">Job information.</param> Task DeleteAsync(BackgroundJobInfo jobInfo); /// <summary> /// Updates a job. /// </summary> /// <param name="jobInfo">Job information.</param> Task UpdateAsync(BackgroundJobInfo jobInfo); }
這裏先從簡單的內存 Store 提及,這個 InMemoryBackgroundJobStore
內部使用了一個並行字典來存儲這些任務信息。
public class InMemoryBackgroundJobStore : IBackgroundJobStore { private readonly ConcurrentDictionary<long, BackgroundJobInfo> _jobs; private long _lastId; public InMemoryBackgroundJobStore() { _jobs = new ConcurrentDictionary<long, BackgroundJobInfo>(); } }
至關簡單,這幾個接口方法基本上就是針對與這個並行字典操做的一層封裝。
public Task<BackgroundJobInfo> GetAsync(long jobId) { return Task.FromResult(_jobs[jobId]); } public Task InsertAsync(BackgroundJobInfo jobInfo) { jobInfo.Id = Interlocked.Increment(ref _lastId); _jobs[jobInfo.Id] = jobInfo; return Task.FromResult(0); } public Task<List<BackgroundJobInfo>> GetWaitingJobsAsync(int maxResultCount) { var waitingJobs = _jobs.Values // 首先篩選出再也不執行的後臺任務 .Where(t => !t.IsAbandoned && t.NextTryTime <= Clock.Now) // 第一次根據後臺做業的優先級進行排序,高優先級優先執行 .OrderByDescending(t => t.Priority) // 再根據執行次數排序,執行次數越少的,越靠前 .ThenBy(t => t.TryCount) .ThenBy(t => t.NextTryTime) .Take(maxResultCount) .ToList(); return Task.FromResult(waitingJobs); } public Task DeleteAsync(BackgroundJobInfo jobInfo) { _jobs.TryRemove(jobInfo.Id, out _); return Task.FromResult(0); } public Task UpdateAsync(BackgroundJobInfo jobInfo) { // 若是是再也不執行的任務,刪除 if (jobInfo.IsAbandoned) { return DeleteAsync(jobInfo); } return Task.FromResult(0); }
至於持久化到數據庫,無非是注入一個倉儲,而後針對這個倉儲進行增刪查改的操做罷了,這裏就不在贅述。
後臺做業的優先級定義在 BackgroundJobPriority
枚舉當中,一共有 5 個等級,分別是 Low
、BelowNormal
、Normal
、AboveNormal
、High
,他們從最低到最高排列。