文章信息:html
基於的 ABP vNext 版本:1.0.0數據庫
創做日期:2019 年 10 月 24 日晚緩存
更新日期:暫無安全
ABP vNext 提供了後臺工做者和後臺做業的支持,基本實現與原來的 ABP 框架相似,而且 ABP vNext 還提供了對 HangFire 和 RabbitMQ 的後臺做業集成。開發人員在使用這些第三方庫的時候,基本就是開箱即用,不須要作其餘複雜的配置。多線程
後臺做業在系統開發的過程中,是比較經常使用的功能。由於老是有一些長耗時的任務,而這些任務咱們不是當即響應的,例如 Excel 文檔導入、批量發送短信通知等。app
後臺工做者 的話,ABP vNext 的實現就是在 CLR 的 Timer
之上封裝了一層,週期性地執行用戶邏輯。ABP vNext 默認提供的 後臺任務管理器,就是在後臺工做者基礎之上進行的封裝。框架
涉及到後臺任務、後臺工做者的模塊一共有 6 個,它們分別是:async
AbpTimer
就是在裏面實現的。Volo.Abp.BackgroundWorkers :後臺工做者的定義和實現。分佈式
Volo.Abp.BackgroundJobs.RabbitMQ : 基於 RabbitMQ 實現的後臺任務管理器。ide
CLR 爲咱們提供了多種計時器,咱們通常使用的是 System.Threading.Timer
,它是基於 CLR 線程池的一個週期計時器,會根據咱們配置的 Period
(週期) 定時執行。在 CLR 線程池中,全部的 Timer
只有 1 個線程爲其服務。這個線程直到下一個計時器的觸發時間,當下一個 Timer
對象到期時,這個線程就會將 Timer
的回調方法經過 ThreadPool.QueueUserWorkItem()
扔到線程池去執行。
不過這帶來了一個問題,即你的回調方法執行時間超過了計時器的週期,那麼就會形成上一個任務還沒執行完成又開始執行新的任務。
解決這個方法其實很簡單,即啓動以後,將週期設置爲 Timeout.Infinite
,這樣只會執行一次。當回調方法執行完成以後,就設置 dueTime
參數說明下次執行要等待多久,而且週期仍是 Timeout.Infinite
。
ABP vNext 已經爲咱們提供了健壯的計時器,該類型的定義是 AbpTimer
,在內部用到了 volatile
關鍵字和 Monitor
實現 條件變量模式 解決多線程環境下的問題。
public class AbpTimer : ITransientDependency { // 回調事件。 public event EventHandler Elapsed; // 執行週期。 public int Period { get; set; } // 定時器啓動以後就開始運行,默認爲 Fasle。 public bool RunOnStart { get; set; } // 日誌記錄器。 public ILogger<AbpTimer> Logger { get; set; } private readonly Timer _taskTimer; // 定時器是否在執行任務,默認爲 false。 private volatile bool _performingTasks; // 定時器的運行狀態,默認爲 false。 private volatile bool _isRunning; public AbpTimer() { Logger = NullLogger<AbpTimer>.Instance; // 回調函數是 TimerCallBack,執行週期爲永不執行。 _taskTimer = new Timer(TimerCallBack, null, Timeout.Infinite, Timeout.Infinite); } public void Start(CancellationToken cancellationToken = default) { // 若是傳遞的週期小於等於 0 ,則拋出異常。 if (Period <= 0) { throw new AbpException("Period should be set before starting the timer!"); } // 使用互斥鎖,保證線程安全。 lock (_taskTimer) { // 若是啓動以後就須要立刻執行,則設置爲 0,立刻執行任務,不然會等待 Period 毫秒以後再執行(1 個週期)。 _taskTimer.Change(RunOnStart ? 0 : Period, Timeout.Infinite); // 定時器成功運行了。 _isRunning = true; } // 釋放 _taskTimer 的互斥鎖。 } public void Stop(CancellationToken cancellationToken = default) { // 使用互斥鎖。 lock (_taskTimer) { // 將內部定時器設置爲永不執行的狀態。 _taskTimer.Change(Timeout.Infinite, Timeout.Infinite); // 檢測當前是否還有正在執行的任務,若是有則等待任務執行完成。 while (_performingTasks) { // 臨時釋放鎖,阻塞當前線程。可是其餘線程能夠獲取 _timer 的互斥鎖。 Monitor.Wait(_taskTimer); } // 須要表示中止狀態,因此標記狀態爲 false。 _isRunning = false; } } private void TimerCallBack(object state) { lock (_taskTimer) { // 若是有任務正在運行,或者內部定時器已經中止了,則不作任何事情。 if (!_isRunning || _performingTasks) { return; } // 臨時中止內部定時器。 _taskTimer.Change(Timeout.Infinite, Timeout.Infinite); // 代表立刻須要執行任務了。 _performingTasks = true; } try { // 調用綁定的事件。 Elapsed.InvokeSafely(this, new EventArgs()); } catch { // 注意,這裏將會吞噬異常。 } finally { lock (_taskTimer) { // 任務執行完成,更改狀態。 _performingTasks = false; // 若是定時器還在運行,沒有被中止,則啓動下一個 Period 週期。 if (_isRunning) { _taskTimer.Change(Period, Timeout.Infinite); } // 解除由於釋放鎖而阻塞的線程。 // 若是已經調用了 Stop,則會喚醒那個由於 Wait 阻塞的線程,就會使 _isRunning 置爲 false。 Monitor.Pulse(_taskTimer); } } } }
這裏對 _performingTasks
和 _isRunning
字段設置爲 volatile
防止指令重排和寄存器緩存。這是由於在 Stop
方法內部使用到的 _performingTasks
可能會被優化,因此將該字段設置爲了易失的。
IRunnable
接口ABP vNext 爲任務的啓動和中止,抽象了一個 IRunnable
接口。雖然描述說的是對線程的行爲進行抽象,但千萬千萬不要手動調用 Thread.Abort()
。關於 Thread.Abort()
的壞處,這裏再也不多加贅述,能夠參考 這篇文章 的描述,或者搜索其餘的相關文章。
public interface IRunnable { // 啓動這個服務。 Task StartAsync(CancellationToken cancellationToken = default); /// <summary> /// 中止這個服務。 /// </summary> Task StopAsync(CancellationToken cancellationToken = default); }
後臺工做者的模塊行爲比較簡單,它定義了在應用程序初始化和銷燬時的行爲。在初始化時,後臺工做者管理器 得到全部 後臺工做者,並開始啓動它們。在銷燬時,後臺工做者管理器得到全部後臺工做者,並開始中止他們,這樣纔可以作到優雅退出。
[DependsOn( typeof(AbpThreadingModule) )] public class AbpBackgroundWorkersModule : AbpModule { public override void OnApplicationInitialization(ApplicationInitializationContext context) { var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value; // 若是啓用了後臺工做者,那麼得到後臺工做者管理器的實例,並調用 StartAsync 啓動全部後臺工做者。 if (options.IsEnabled) { AsyncHelper.RunSync( () => context.ServiceProvider .GetRequiredService<IBackgroundWorkerManager>() .StartAsync() ); } } public override void OnApplicationShutdown(ApplicationShutdownContext context) { var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value; // 若是啓用了後臺工做者,那麼得到後臺工做者管理器的實例,並調用 StopAsync 中止全部後臺工做者。 if (options.IsEnabled) { AsyncHelper.RunSync( () => context.ServiceProvider .GetRequiredService<IBackgroundWorkerManager>() .StopAsync() ); } } }
首先看看 IBackgroundWorker
接口的定義,是空的。不過繼承了 ISingletonDependency
接口,說明咱們的每一個後臺工做者都是 單例 的。
/// <summary> /// 在後臺運行,執行某些任務的工做程序(線程)的接口定義。 /// </summary> public interface IBackgroundWorker : IRunnable, ISingletonDependency { }
ABP vNext 爲咱們定義了一個抽象的後臺工做者類型 BackgroundWorkerBase
,這個基類的設計目的是提供一些經常使用組件(和 ApplicationService
同樣)。
public abstract class BackgroundWorkerBase : IBackgroundWorker { //TODO: Add UOW, Localization and other useful properties..? //TODO: 是否應該提供工做單元、本地化以及其餘經常使用的屬性? public ILogger<BackgroundWorkerBase> Logger { protected get; set; } protected BackgroundWorkerBase() { Logger = NullLogger<BackgroundWorkerBase>.Instance; } public virtual Task StartAsync(CancellationToken cancellationToken = default) { Logger.LogDebug("Started background worker: " + ToString()); return Task.CompletedTask; } public virtual Task StopAsync(CancellationToken cancellationToken = default) { Logger.LogDebug("Stopped background worker: " + ToString()); return Task.CompletedTask; } public override string ToString() { return GetType().FullName; } }
ABP vNext 內部只有一個默認的後臺工做者實現 PeriodicBackgroundWorkerBase
。從名字上來看,意思是就是週期執行的後臺工做者,內部就是用的 AbpTimer
來實現,ABP vNext 將其包裝起來是爲了實現統一的模式(後臺工做者)。
public abstract class PeriodicBackgroundWorkerBase : BackgroundWorkerBase { protected readonly AbpTimer Timer; // 也就意味着子類必須在其構造函數,指定 timer 的執行週期。 protected PeriodicBackgroundWorkerBase(AbpTimer timer) { Timer = timer; Timer.Elapsed += Timer_Elapsed; } // 啓動後臺工做者。 public override async Task StartAsync(CancellationToken cancellationToken = default) { await base.StartAsync(cancellationToken); Timer.Start(cancellationToken); } // 中止後臺工做者。 public override async Task StopAsync(CancellationToken cancellationToken = default) { Timer.Stop(cancellationToken); await base.StopAsync(cancellationToken); } // Timer 關聯的週期事件,之因此不直接掛載 DoWork,是爲了捕獲異常。 private void Timer_Elapsed(object sender, System.EventArgs e) { try { DoWork(); } catch (Exception ex) { Logger.LogException(ex); } } // 你要週期執行的任務。 protected abstract void DoWork(); }
咱們若是要實現本身的後臺工做者,只須要繼承該類,實現 DoWork()
方法便可。
public class TestWorker : PeriodicBackgroundWorkerBase { public TestWorker(AbpTimer timer) : base(timer) { // 每五分鐘執行一次。 timer.Period = (int)TimeSpan.FromMinutes(5).TotalMilliseconds; } protected override void DoWork() { Console.WriteLine("後臺工做者被執行了。"); } }
而後在咱們本身模塊的 OnPreApplicationInitialization()
方法內解析出後臺做業管理器(IBackgroundWorkerManager
),調用它的 Add()
方法,將咱們定義的 TestWorker
添加到管理器當中便可。
全部的後臺工做者都是經過 IBackgroundWorkerManager
進行管理的,它提供了 StartAsync()
、StopAsync()
、Add()
方法。前面兩個方法就是 IRunnable
接口定義的,後臺工做者管理器直接集成了該接口,後面的 Add()
方法就是用來動態添加咱們的後臺工做者。
後臺工做者管理器的默認實現是 BackgroundWorkerManager
類型,它內部作的事情很簡單,就是維護一個後臺工做者集合。每當調用 StartAsync()
或 StopAsync()
方法的時候,都從這個集合遍歷後臺工做者,執行他們的啓動和中止方法。
這裏值得注意的一點是,當咱們調用 Add()
方法添加了一個後臺工做者以後,後臺工做者管理器就會啓動這個後臺工做者。
public class BackgroundWorkerManager : IBackgroundWorkerManager, ISingletonDependency, IDisposable { protected bool IsRunning { get; private set; } private bool _isDisposed; private readonly List<IBackgroundWorker> _backgroundWorkers; public BackgroundWorkerManager() { _backgroundWorkers = new List<IBackgroundWorker>(); } public virtual void Add(IBackgroundWorker worker) { _backgroundWorkers.Add(worker); // 若是當先後臺工做者管理器還處於運行狀態,則調用工做者的 StartAsync() 方法啓動。 if (IsRunning) { AsyncHelper.RunSync( () => worker.StartAsync() ); } } public virtual void Dispose() { if (_isDisposed) { return; } _isDisposed = true; //TODO: ??? } // 啓動,則遍歷集合啓動。 public virtual async Task StartAsync(CancellationToken cancellationToken = default) { IsRunning = true; foreach (var worker in _backgroundWorkers) { await worker.StartAsync(cancellationToken); } } // 中止, 則遍歷集合中止。 public virtual async Task StopAsync(CancellationToken cancellationToken = default) { IsRunning = false; foreach (var worker in _backgroundWorkers) { await worker.StopAsync(cancellationToken); } } }
上述代碼其實存在一個問題,即後臺工做者被釋放之後,是否還能執行 Add()
操做。參考我 以前的文章 ,其實當對象被釋放以後,就應該拋出 ObjectDisposedException
異常。
比起後臺工做者,咱們執行一次性任務的時候,通常會使用後臺做業進行處理。比起只能設置固定週期的 PeriodicBackgroundWorkerBase
,集成了 Hangfire 的後臺做業管理器,可以讓咱們使用 Cron 表達式,更加靈活地設置任務的執行週期。
關於後臺做業的模塊,咱們須要說道兩處。第一處是位於 Volo.Abp.BackgroundJobs.Abstractions 項目的 AbpBackgroundJobsAbstractionsModule
,第二齣則是位於 Volo.Abp.BackgroundJobs 項目的 AbpBackgroundJobsModule
。
AbpBackgroundJobsAbstractionsModule
的主要行爲是將符合條件的後臺做業,添加到 AbpBackgroundJobOptions
配置當中,以便後續進行使用。
public override void PreConfigureServices(ServiceConfigurationContext context) { RegisterJobs(context.Services); } private static void RegisterJobs(IServiceCollection services) { var jobTypes = new List<Type>(); // 若是註冊的類型符合 IBackgroundJob<> 泛型,則添加到集合當中。 services.OnRegistred(context => { if (ReflectionHelper.IsAssignableToGenericType(context.ImplementationType, typeof(IBackgroundJob<>))) { jobTypes.Add(context.ImplementationType); } }); services.Configure<AbpBackgroundJobOptions>(options => { // 將數據賦值給配置類。 foreach (var jobType in jobTypes) { options.AddJob(jobType); } }); }
Volo.Abp.BackgroundJobs 內部是 ABP vNext 爲咱們提供的 默認後臺做業管理器,這個後臺做業管理器 本質上是一個後臺工做者。
這個後臺工做者會週期性(取決於 AbpBackgroundJobWorkerOptions.JobPollPeriod
值,默認爲 5 秒種)地從 IBackgroundJobStore
撈出一堆後臺任務,而且在後臺執行。至於每次執行多少個後臺任務,這也取決於 AbpBackgroundJobWorkerOptions.MaxJobFetchCount
的值,默認值是 1000 個。
注意:
這裏的 Options 類是
AbpBackgroundJobWorkerOptions
,別和AbpBackgroundWorkerOptions
混淆了。
因此在 AbpBackgroundJobsModule
模塊裏面,只作了一件事情,就是將負責後臺做業的後臺工做者,添加到後臺工做者管理器種,並開始週期性地執行。
public override void OnApplicationInitialization(ApplicationInitializationContext context) { var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundJobOptions>>().Value; if (options.IsJobExecutionEnabled) { // 得到後臺工做者管理器,並將負責後臺做業的工做者添加進去。 context.ServiceProvider .GetRequiredService<IBackgroundWorkerManager>() .Add(context.ServiceProvider.GetRequiredService<IBackgroundJobWorker>() ); } }
在上一節裏面看到,只要是實現 IBackgroundJob<TArgs>
類型的都視爲一個後臺做業。這個後臺做業接口,只定義了一個行爲,那就是執行(Execute(TArgs)
)。這裏的 TArgs
泛型做爲執行後臺做業時,須要傳遞的參數類型。
// 由於是傳入的參數,因此泛型參數是逆變的。 public interface IBackgroundJob<in TArgs> { void Execute(TArgs args); }
檢查源碼,發現 ABP vNext 的郵箱模塊定義了一個郵件發送任務 BackgroundEmailSendingJob
,它的實現大概以下。
public class BackgroundEmailSendingJob : BackgroundJob<BackgroundEmailSendingJobArgs>, ITransientDependency { // ... public override void Execute(BackgroundEmailSendingJobArgs args) { AsyncHelper.RunSync(() => EmailSender.SendAsync(args.To, args.Subject, args.Body, args.IsBodyHtml)); } }
後臺做業都是經過一個後臺做業管理器(IBackgroundJobManager
)進行管理的,這個接口定義了一個入隊方法(EnqueueAsync()
),注意,咱們的後臺做業在入隊後,不是立刻執行的。
說一下這個入隊處理邏輯:
BackgroundJobNameAttribute
特性,那麼任務的名稱就是參數類型的 FullName
。)BackgroundJobInfo
對象。IBackgroundJobStore
持久化任務信息。public virtual async Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null) { // 獲取任務名稱。 var jobName = BackgroundJobNameAttribute.GetName<TArgs>(); var jobId = await EnqueueAsync(jobName, args, priority, delay); return jobId.ToString(); } protected virtual async Task<Guid> EnqueueAsync(string jobName, object args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null) { var jobInfo = new BackgroundJobInfo { Id = GuidGenerator.Create(), JobName = jobName, // 經過序列化器,序列化參數值,方便存儲。這裏內部其實使用的是 JSON.NET 進行序列化。 JobArgs = Serializer.Serialize(args), Priority = priority, CreationTime = Clock.Now, NextTryTime = Clock.Now }; // 若是任務有執行延遲,則任務的初始執行時間要加上這個延遲。 if (delay.HasValue) { jobInfo.NextTryTime = Clock.Now.Add(delay.Value); } // 持久化任務信息,方便後面執行後臺做業的工做者可以取到。 await Store.InsertAsync(jobInfo); return jobInfo.Id; }
BackgroundJobNameAttribute
相關的方法:
public static string GetName<TJobArgs>() { return GetName(typeof(TJobArgs)); } public static string GetName([NotNull] Type jobArgsType) { Check.NotNull(jobArgsType, nameof(jobArgsType)); // 判斷參數類型上面是否標註了特性,而且特性實現了 IBackgroundJobNameProvider 接口。 return jobArgsType .GetCustomAttributes(true) .OfType<IBackgroundJobNameProvider>() .FirstOrDefault() ?.Name // 拿不到名字,則使用類型的 FullName。 ?? jobArgsType.FullName; }
後臺做業的存儲默認是放在內存的,這點能夠從 InMemoryBackgroundJobStore
類型實現看出來。在它的內部使用了一個並行字典,經過做業的 Guid 與做業進行關聯綁定。
除了內存實現,在 Volo.Abp.BackgroundJobs.Domain 模塊還有一個 BackgroundJobStore
實現,基本套路與 SettingStore
同樣,都是存儲到數據庫裏面。
public class BackgroundJobStore : IBackgroundJobStore, ITransientDependency { protected IBackgroundJobRepository BackgroundJobRepository { get; } // ... public BackgroundJobInfo Find(Guid jobId) { return ObjectMapper.Map<BackgroundJobRecord, BackgroundJobInfo>( BackgroundJobRepository.Find(jobId) ); } // ... public void Insert(BackgroundJobInfo jobInfo) { BackgroundJobRepository.Insert( ObjectMapper.Map<BackgroundJobInfo, BackgroundJobRecord>(jobInfo) ); } // ... }
默認的後臺做業管理器是經過一個後臺工做者來執行後臺任務的,這個實現叫作 BackgroundJobWorker
,這個後臺工做者的生命週期也是單例的。後臺做業的具體執行邏輯裏面,涉及到了如下幾個類型的交互。
類型 | 做用 |
---|---|
AbpBackgroundJobOptions |
提供每一個後臺任務的配置信息,包括任務的類型、參數類型、任務名稱數據。 |
AbpBackgroundJobWorkerOptions |
提供後臺做業工做者的配置信息,例如每一個週期 最大執行的做業數量、後臺 工做者的 執行週期、做業執行 超時時間 等。 |
BackgroundJobConfiguration |
後臺任務的配置信息,做用是將持久化存儲的做業信息與運行時類型進行綁定 和實例化,以便 ABP vNext 來執行具體的任務。 |
IBackgroundJobExecuter |
後臺做業的執行器,當咱們從持久化存儲獲取到後臺做業信息時,將會經過 這個執行器來執行具體的後臺做業。 |
IBackgroundJobSerializer |
後臺做業序列化器,用於後臺做業持久化時進行序列化的工具,默認採用的 是 JSON.NET 進行實現。 |
JobExecutionContext |
執行器在執行後臺做業時,是經過這個上下文參數進行執行的,在這個上下 文內部,包含了後臺做業的具體類型、後臺做業的參數值。 |
IBackgroundJobStore |
前面已經講過了,這個是用於後臺做業的持久化存儲,默認實現是存儲在內存。 |
BackgroundJobPriority |
後臺做業的執行優先級定義,ABP vNext 在執行後臺任務時,會根據任務的優 先級進行排序,以便在後面執行的時候優先級高的任務先執行。 |
咱們來按照邏輯順序走一遍它的實現,首前後臺做業的執行工做者會從持久化存儲內,獲取 MaxJobFetchCount
個任務用於執行。從持久化存儲獲取後臺做業信息(BackgroundJobInfo
),是由 IBackgroundJobStore
提供的。
var store = scope.ServiceProvider.GetRequiredService<IBackgroundJobStore>(); var waitingJobs = store.GetWaitingJobs(WorkerOptions.MaxJobFetchCount); // 不存在任何後臺做業,則直接結束本次調用。 if (!waitingJobs.Any()) { return; }
InMemoryBackgroundJobStore
的相關實現:
public List<BackgroundJobInfo> GetWaitingJobs(int maxResultCount) { return _jobs.Values .Where(t => !t.IsAbandoned && t.NextTryTime <= Clock.Now) .OrderByDescending(t => t.Priority) .ThenBy(t => t.TryCount) .ThenBy(t => t.NextTryTime) .Take(maxResultCount) .ToList(); }
上面的代碼能夠看出來,首先排除 被放棄的任務 ,包含達到執行時間的任務,而後根據任務的優先級從高到低進行排序。重試次數少的優先執行,預計執行時間越早的越先執行。最後從這些數據中,篩選出 maxResultCount
結果並返回。
說到這裏,咱們來看一下這個 NextTryTime
是如何被計算出來的?回想起最開始的後臺做業管理器,咱們在添加一個後臺任務的時候,就會設置這個後臺任務的 預計執行時間。第一個任務被添加到執行隊列中時,它的值通常是 Clock.Now
,也就是它被添加到隊列的時間。
不過 ABP vNext 爲了讓那些常常執行失敗的任務,有比較低的優先級再執行,就在每次任務執行失敗以後,會將 NextTryTime
的值指數級進行增長。這塊代碼能夠在 CalculateNextTryTime
裏面看到,也就是說某個任務的執行 失敗次數越高,那麼它下一次的預期執行時間就會越遠。
protected virtual DateTime? CalculateNextTryTime(BackgroundJobInfo jobInfo, IClock clock) { // 通常來講,這個 DefaultWaitFactor 因子的值是 2.0 。 var nextWaitDuration = WorkerOptions.DefaultFirstWaitDuration * (Math.Pow(WorkerOptions.DefaultWaitFactor, jobInfo.TryCount - 1)); // 同執行失敗的次數進行掛鉤。 var nextTryDate = jobInfo.LastTryTime?.AddSeconds(nextWaitDuration) ?? clock.Now.AddSeconds(nextWaitDuration); if (nextTryDate.Subtract(jobInfo.CreationTime).TotalSeconds > WorkerOptions.DefaultTimeout) { return null; } return nextTryDate; }
當預期的執行時間都超過 DefaultTimeout
的超時時間時(默認爲 2 天),說明這個任務確實沒救了,就不要再執行了。
咱們以前說到,從 IBackgroundJobStore
拿到了須要執行的後臺任務信息集合,接下來咱們就要開始執行後臺任務了。
foreach (var jobInfo in waitingJobs) { jobInfo.TryCount++; jobInfo.LastTryTime = clock.Now; try { // 根據任務名稱獲取任務的配置參數。 var jobConfiguration = JobOptions.GetJob(jobInfo.JobName); // 根據配置裏面存儲的任務類型,將參數值進行反序列化。 var jobArgs = serializer.Deserialize(jobInfo.JobArgs, jobConfiguration.ArgsType); // 構造一個新的執行上下文,讓執行器執行任務。 var context = new JobExecutionContext(scope.ServiceProvider, jobConfiguration.JobType, jobArgs); try { jobExecuter.Execute(context); // 若是任務執行成功則刪除該任務。 store.Delete(jobInfo.Id); } catch (BackgroundJobExecutionException) { // 發生任務執行失敗異常時,根據指定的公式計算下一次的執行時間。 var nextTryTime = CalculateNextTryTime(jobInfo, clock); if (nextTryTime.HasValue) { jobInfo.NextTryTime = nextTryTime.Value; } else { // 超過超時時間的時候,公式計算函數返回 null,該任務置爲廢棄任務。 jobInfo.IsAbandoned = true; } TryUpdate(store, jobInfo); } } catch (Exception ex) { // 執行過程當中,產生了未知異常,設置爲廢棄任務,並打印日誌。 Logger.LogException(ex); jobInfo.IsAbandoned = true; TryUpdate(store, jobInfo); } }
執行後臺任務的時候基本分爲 5 步,它們分別是:
至於執行器裏面的真正執行操做,你都拿到了參數值和任務類型了。就能夠經過類型用 IoC 獲取後臺任務對象的實例,而後經過反射匹配方法簽名,在實例上調用這個方法傳入參數便可。
public virtual void Execute(JobExecutionContext context) { // 構造具體的後臺做業實例對象。 var job = context.ServiceProvider.GetService(context.JobType); if (job == null) { throw new AbpException("The job type is not registered to DI: " + context.JobType); } // 得到須要執行的方法簽名。 var jobExecuteMethod = context.JobType.GetMethod(nameof(IBackgroundJob<object>.Execute)); if (jobExecuteMethod == null) { throw new AbpException($"Given job type does not implement {typeof(IBackgroundJob<>).Name}. The job type was: " + context.JobType); } try { // 直接經過 MethodInfo 的 Invoke 方法調用,傳入具體的實例對象和參數值便可。 jobExecuteMethod.Invoke(job, new[] { context.JobArgs }); } catch (Exception ex) { Logger.LogException(ex); // 若是是執行方法內的異常,則包裝進行處理,而後拋出。 throw new BackgroundJobExecutionException("A background job execution is failed. See inner exception for details.", ex) { JobType = context.JobType.AssemblyQualifiedName, JobArgs = context.JobArgs }; } }
ABP vNext 對於 Hangfire 的集成代碼分佈在 Volo.Abp.HangFire 和 Volo.Abp.BackgroundJobs.HangFire 模塊內部,前者是在模塊配置裏面,調用 Hangfire 庫的相關方法,注入組件到 IoC 容器當中。後者則是對後臺做業進行了適配處理,替換了默認的 IBackgroundJobManager
實現。
在 AbpHangfireModule
模塊內部,經過工廠建立出來一個 BackgroudJobServer
實例,並將它的生命週期與應用程序的生命週期進行綁定,以便進行銷燬處理。
public class AbpHangfireModule : AbpModule { private BackgroundJobServer _backgroundJobServer; public override void ConfigureServices(ServiceConfigurationContext context) { context.Services.AddHangfire(configuration => { context.Services.ExecutePreConfiguredActions(configuration); }); } public override void OnApplicationInitialization(ApplicationInitializationContext context) { var options = context.ServiceProvider.GetRequiredService<IOptions<AbpHangfireOptions>>().Value; _backgroundJobServer = options.BackgroundJobServerFactory.Invoke(context.ServiceProvider); } public override void OnApplicationShutdown(ApplicationShutdownContext context) { //TODO: ABP may provide two methods for application shutdown: OnPreApplicationShutdown & OnApplicationShutdown _backgroundJobServer.SendStop(); _backgroundJobServer.Dispose(); } }
咱們直奔主題,看一下基於 Hangfire 的後臺做業管理器是怎麼實現的。
public class HangfireBackgroundJobManager : IBackgroundJobManager, ITransientDependency { public Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null) { // 若是沒有延遲參數,則直接經過 Enqueue() 方法扔進執行對了。 if (!delay.HasValue) { return Task.FromResult( BackgroundJob.Enqueue<HangfireJobExecutionAdapter<TArgs>>( adapter => adapter.Execute(args) ) ); } else { return Task.FromResult( BackgroundJob.Schedule<HangfireJobExecutionAdapter<TArgs>>( adapter => adapter.Execute(args), delay.Value ) ); } }
上述代碼中使用 HangfireJobExecutionAdapter
進行了一個適配操做,由於 Hangfire 要將一個後臺任務扔進隊列執行,不是用 TArgs
就能解決的。
轉到這個適配器定義,提供了一個 Execute(TArgs)
方法,當被添加到 Hangfire 隊列執行的時候。實際 Hangfire 會調用適配器的 Excetue(TArgs)
方法,而後內部仍是使用的 IBackgroundJobExecuter
來執行具體定義的任務。
public class HangfireJobExecutionAdapter<TArgs> { protected AbpBackgroundJobOptions Options { get; } protected IServiceScopeFactory ServiceScopeFactory { get; } protected IBackgroundJobExecuter JobExecuter { get; } public HangfireJobExecutionAdapter( IOptions<AbpBackgroundJobOptions> options, IBackgroundJobExecuter jobExecuter, IServiceScopeFactory serviceScopeFactory) { JobExecuter = jobExecuter; ServiceScopeFactory = serviceScopeFactory; Options = options.Value; } public void Execute(TArgs args) { using (var scope = ServiceScopeFactory.CreateScope()) { var jobType = Options.GetJob(typeof(TArgs)).JobType; var context = new JobExecutionContext(scope.ServiceProvider, jobType, args); JobExecuter.Execute(context); } } }
基於 RabbitMQ 的後臺做業實現,我想放在分佈式事件總線裏面,對其一塊兒進行講解。
ABP vNext 爲咱們提供了多種後臺做業管理器的實現,你能夠根據本身的需求選用不一樣的後臺做業管理器,又或者是本身動手造輪子。
須要看其餘的 ABP vNext 相關文章?點擊我 便可跳轉到總目錄。