[Abp vNext 源碼分析] - 12. 後臺做業與後臺工做者

1、簡要說明

文章信息:html

基於的 ABP vNext 版本:1.0.0數據庫

創做日期:2019 年 10 月 24 日晚緩存

更新日期:暫無安全

ABP vNext 提供了後臺工做者和後臺做業的支持,基本實現與原來的 ABP 框架相似,而且 ABP vNext 還提供了對 HangFire 和 RabbitMQ 的後臺做業集成。開發人員在使用這些第三方庫的時候,基本就是開箱即用,不須要作其餘複雜的配置。多線程

後臺做業在系統開發的過程中,是比較經常使用的功能。由於老是有一些長耗時的任務,而這些任務咱們不是當即響應的,例如 Excel 文檔導入、批量發送短信通知等。app

後臺工做者 的話,ABP vNext 的實現就是在 CLR 的 Timer 之上封裝了一層,週期性地執行用戶邏輯。ABP vNext 默認提供的 後臺任務管理器,就是在後臺工做者基礎之上進行的封裝。框架

涉及到後臺任務、後臺工做者的模塊一共有 6 個,它們分別是:async

  • Volo.Abp.Threading :提供了一些經常使用的線程組件,其中 AbpTimer 就是在裏面實現的。
  • Volo.Abp.BackgroundWorkers :後臺工做者的定義和實現。分佈式

  • Volo.Abp.BackgroundJobs.Abstractions :後臺任務的一些共有定義。
  • Volo.Abp.BackgroundJobs :默認的後臺任務管理器實現。
  • Volo.Abp.BackgroundJobs.HangFire :基於 Hangfire 庫實現的後臺任務管理器。
  • Volo.Abp.BackgroundJobs.RabbitMQ : 基於 RabbitMQ 實現的後臺任務管理器。ide

2、源碼分析

線程組件

健壯的計時器

CLR 爲咱們提供了多種計時器,咱們通常使用的是 System.Threading.Timer ,它是基於 CLR 線程池的一個週期計時器,會根據咱們配置的 Period (週期) 定時執行。在 CLR 線程池中,全部的 Timer 只有 1 個線程爲其服務。這個線程直到下一個計時器的觸發時間,當下一個 Timer 對象到期時,這個線程就會將 Timer 的回調方法經過 ThreadPool.QueueUserWorkItem() 扔到線程池去執行。

不過這帶來了一個問題,即你的回調方法執行時間超過了計時器的週期,那麼就會形成上一個任務還沒執行完成又開始執行新的任務。

解決這個方法其實很簡單,即啓動以後,將週期設置爲 Timeout.Infinite ,這樣只會執行一次。當回調方法執行完成以後,就設置 dueTime 參數說明下次執行要等待多久,而且週期仍是 Timeout.Infinite

ABP vNext 已經爲咱們提供了健壯的計時器,該類型的定義是 AbpTimer ,在內部用到了 volatile 關鍵字和 Monitor 實現 條件變量模式 解決多線程環境下的問題。

public class AbpTimer : ITransientDependency
{
    // 回調事件。
    public event EventHandler Elapsed;

    // 執行週期。
    public int Period { get; set; }

    // 定時器啓動以後就開始運行,默認爲 Fasle。
    public bool RunOnStart { get; set; }

    // 日誌記錄器。
    public ILogger<AbpTimer> Logger { get; set; }

    private readonly Timer _taskTimer;
    // 定時器是否在執行任務,默認爲 false。
    private volatile bool _performingTasks;
    // 定時器的運行狀態,默認爲 false。
    private volatile bool _isRunning;

    public AbpTimer()
    {
        Logger = NullLogger<AbpTimer>.Instance;

        // 回調函數是 TimerCallBack,執行週期爲永不執行。
        _taskTimer = new Timer(TimerCallBack, null, Timeout.Infinite, Timeout.Infinite);
    }

    public void Start(CancellationToken cancellationToken = default)
    {
        // 若是傳遞的週期小於等於 0 ,則拋出異常。
        if (Period <= 0)
        {
            throw new AbpException("Period should be set before starting the timer!");
        }

        // 使用互斥鎖,保證線程安全。
        lock (_taskTimer)
        {
            // 若是啓動以後就須要立刻執行,則設置爲 0,立刻執行任務,不然會等待 Period 毫秒以後再執行(1 個週期)。
            _taskTimer.Change(RunOnStart ? 0 : Period, Timeout.Infinite);
            // 定時器成功運行了。
            _isRunning = true;
        }
        // 釋放 _taskTimer 的互斥鎖。
    }

    public void Stop(CancellationToken cancellationToken = default)
    {
        // 使用互斥鎖。
        lock (_taskTimer)
        {
            // 將內部定時器設置爲永不執行的狀態。
            _taskTimer.Change(Timeout.Infinite, Timeout.Infinite);

            // 檢測當前是否還有正在執行的任務,若是有則等待任務執行完成。
            while (_performingTasks)
            {
                // 臨時釋放鎖,阻塞當前線程。可是其餘線程能夠獲取 _timer 的互斥鎖。
                Monitor.Wait(_taskTimer);
            }

            // 須要表示中止狀態,因此標記狀態爲 false。
            _isRunning = false;
        }
    }

    private void TimerCallBack(object state)
    {
        lock (_taskTimer)
        {
            // 若是有任務正在運行,或者內部定時器已經中止了,則不作任何事情。
            if (!_isRunning || _performingTasks)
            {
                return;
            }

            // 臨時中止內部定時器。
            _taskTimer.Change(Timeout.Infinite, Timeout.Infinite);
            // 代表立刻須要執行任務了。
            _performingTasks = true;
        }

        try
        {
            // 調用綁定的事件。
            Elapsed.InvokeSafely(this, new EventArgs());
        }
        catch
        {
            // 注意,這裏將會吞噬異常。
        }
        finally
        {
            lock (_taskTimer)
            {
                // 任務執行完成,更改狀態。
                _performingTasks = false;

                // 若是定時器還在運行,沒有被中止,則啓動下一個 Period 週期。
                if (_isRunning)
                {
                    _taskTimer.Change(Period, Timeout.Infinite);
                }

                // 解除由於釋放鎖而阻塞的線程。
                // 若是已經調用了 Stop,則會喚醒那個由於 Wait 阻塞的線程,就會使 _isRunning 置爲 false。
                Monitor.Pulse(_taskTimer);
            }
        }
    }
}

這裏對 _performingTasks_isRunning 字段設置爲 volatile 防止指令重排和寄存器緩存。這是由於在 Stop 方法內部使用到的 _performingTasks 可能會被優化,因此將該字段設置爲了易失的。

IRunnable 接口

ABP vNext 爲任務的啓動和中止,抽象了一個 IRunnable 接口。雖然描述說的是對線程的行爲進行抽象,但千萬千萬不要手動調用 Thread.Abort() 。關於 Thread.Abort() 的壞處,這裏再也不多加贅述,能夠參考 這篇文章 的描述,或者搜索其餘的相關文章。

public interface IRunnable
{
    // 啓動這個服務。
    Task StartAsync(CancellationToken cancellationToken = default);

    /// <summary>
    /// 中止這個服務。
    /// </summary>
    Task StopAsync(CancellationToken cancellationToken = default);
}

後臺工做者

模塊的構造

後臺工做者的模塊行爲比較簡單,它定義了在應用程序初始化和銷燬時的行爲。在初始化時,後臺工做者管理器 得到全部 後臺工做者,並開始啓動它們。在銷燬時,後臺工做者管理器得到全部後臺工做者,並開始中止他們,這樣纔可以作到優雅退出。

[DependsOn(
    typeof(AbpThreadingModule)
    )]
public class AbpBackgroundWorkersModule : AbpModule
{
    public override void OnApplicationInitialization(ApplicationInitializationContext context)
    {
        var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value;
        // 若是啓用了後臺工做者,那麼得到後臺工做者管理器的實例,並調用 StartAsync 啓動全部後臺工做者。
        if (options.IsEnabled)
        {
            AsyncHelper.RunSync(
                () => context.ServiceProvider
                    .GetRequiredService<IBackgroundWorkerManager>()
                    .StartAsync()
            );
        }
    }

    public override void OnApplicationShutdown(ApplicationShutdownContext context)
    {
        var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value;
        // 若是啓用了後臺工做者,那麼得到後臺工做者管理器的實例,並調用 StopAsync 中止全部後臺工做者。
        if (options.IsEnabled)
        {
            AsyncHelper.RunSync(
                () => context.ServiceProvider
                    .GetRequiredService<IBackgroundWorkerManager>()
                    .StopAsync()
            );
        }
    }
}

後臺工做者的定義

首先看看 IBackgroundWorker 接口的定義,是空的。不過繼承了 ISingletonDependency 接口,說明咱們的每一個後臺工做者都是 單例 的。

/// <summary>
/// 在後臺運行,執行某些任務的工做程序(線程)的接口定義。
/// </summary>
public interface IBackgroundWorker : IRunnable, ISingletonDependency
{

}

ABP vNext 爲咱們定義了一個抽象的後臺工做者類型 BackgroundWorkerBase,這個基類的設計目的是提供一些經常使用組件(和 ApplicationService 同樣)。

public abstract class BackgroundWorkerBase : IBackgroundWorker
{
    //TODO: Add UOW, Localization and other useful properties..?
    //TODO: 是否應該提供工做單元、本地化以及其餘經常使用的屬性?

    public ILogger<BackgroundWorkerBase> Logger { protected get; set; }

    protected BackgroundWorkerBase()
    {
        Logger = NullLogger<BackgroundWorkerBase>.Instance;
    }
    
    public virtual Task StartAsync(CancellationToken cancellationToken = default)
    {
        Logger.LogDebug("Started background worker: " + ToString());
        return Task.CompletedTask;
    }

    public virtual Task StopAsync(CancellationToken cancellationToken = default)
    {
        Logger.LogDebug("Stopped background worker: " + ToString());
        return Task.CompletedTask;
    }

    public override string ToString()
    {
        return GetType().FullName;
    }
}

ABP vNext 內部只有一個默認的後臺工做者實現 PeriodicBackgroundWorkerBase。從名字上來看,意思是就是週期執行的後臺工做者,內部就是用的 AbpTimer 來實現,ABP vNext 將其包裝起來是爲了實現統一的模式(後臺工做者)。

public abstract class PeriodicBackgroundWorkerBase : BackgroundWorkerBase
{
    protected readonly AbpTimer Timer;

    // 也就意味着子類必須在其構造函數,指定 timer 的執行週期。
    protected PeriodicBackgroundWorkerBase(AbpTimer timer)
    {
        Timer = timer;
        Timer.Elapsed += Timer_Elapsed;
    }

    // 啓動後臺工做者。
    public override async Task StartAsync(CancellationToken cancellationToken = default)
    {
        await base.StartAsync(cancellationToken);
        Timer.Start(cancellationToken);
    }

    // 中止後臺工做者。
    public override async Task StopAsync(CancellationToken cancellationToken = default)
    {
        Timer.Stop(cancellationToken);
        await base.StopAsync(cancellationToken);
    }
    
    // Timer 關聯的週期事件,之因此不直接掛載 DoWork,是爲了捕獲異常。
    private void Timer_Elapsed(object sender, System.EventArgs e)
    {
        try
        {
            DoWork();
        }
        catch (Exception ex)
        {
            Logger.LogException(ex);
        }
    }

    // 你要週期執行的任務。
    protected abstract void DoWork();
}

咱們若是要實現本身的後臺工做者,只須要繼承該類,實現 DoWork() 方法便可。

public class TestWorker : PeriodicBackgroundWorkerBase
{
    public TestWorker(AbpTimer timer) : base(timer)
    {
        // 每五分鐘執行一次。
        timer.Period = (int)TimeSpan.FromMinutes(5).TotalMilliseconds;
    }

    protected override void DoWork()
    {
        Console.WriteLine("後臺工做者被執行了。");
    }
}

而後在咱們本身模塊的 OnPreApplicationInitialization() 方法內解析出後臺做業管理器(IBackgroundWorkerManager),調用它的 Add() 方法,將咱們定義的 TestWorker 添加到管理器當中便可。

後臺工做者管理器

全部的後臺工做者都是經過 IBackgroundWorkerManager 進行管理的,它提供了 StartAsync()StopAsync()Add() 方法。前面兩個方法就是 IRunnable 接口定義的,後臺工做者管理器直接集成了該接口,後面的 Add() 方法就是用來動態添加咱們的後臺工做者。

後臺工做者管理器的默認實現是 BackgroundWorkerManager 類型,它內部作的事情很簡單,就是維護一個後臺工做者集合。每當調用 StartAsync()StopAsync() 方法的時候,都從這個集合遍歷後臺工做者,執行他們的啓動和中止方法。

這裏值得注意的一點是,當咱們調用 Add() 方法添加了一個後臺工做者以後,後臺工做者管理器就會啓動這個後臺工做者。

public class BackgroundWorkerManager : IBackgroundWorkerManager, ISingletonDependency, IDisposable
{
    protected bool IsRunning { get; private set; }

    private bool _isDisposed;

    private readonly List<IBackgroundWorker> _backgroundWorkers;

    public BackgroundWorkerManager()
    {
        _backgroundWorkers = new List<IBackgroundWorker>();
    }

    public virtual void Add(IBackgroundWorker worker)
    {
        _backgroundWorkers.Add(worker);

        // 若是當先後臺工做者管理器還處於運行狀態,則調用工做者的 StartAsync() 方法啓動。
        if (IsRunning)
        {
            AsyncHelper.RunSync(
                () => worker.StartAsync()
            );
        }
    }

    public virtual void Dispose()
    {
        if (_isDisposed)
        {
            return;
        }

        _isDisposed = true;

        //TODO: ???
    }

    // 啓動,則遍歷集合啓動。
    public virtual async Task StartAsync(CancellationToken cancellationToken = default)
    {
        IsRunning = true;

        foreach (var worker in _backgroundWorkers)
        {
            await worker.StartAsync(cancellationToken);
        }
    }

    // 中止, 則遍歷集合中止。
    public virtual async Task StopAsync(CancellationToken cancellationToken = default)
    {
        IsRunning = false;

        foreach (var worker in _backgroundWorkers)
        {
            await worker.StopAsync(cancellationToken);
        }
    }
}

上述代碼其實存在一個問題,即後臺工做者被釋放之後,是否還能執行 Add() 操做。參考我 以前的文章 ,其實當對象被釋放以後,就應該拋出 ObjectDisposedException 異常。

後臺做業

比起後臺工做者,咱們執行一次性任務的時候,通常會使用後臺做業進行處理。比起只能設置固定週期的 PeriodicBackgroundWorkerBase ,集成了 Hangfire 的後臺做業管理器,可以讓咱們使用 Cron 表達式,更加靈活地設置任務的執行週期。

模塊的構造

關於後臺做業的模塊,咱們須要說道兩處。第一處是位於 Volo.Abp.BackgroundJobs.Abstractions 項目的 AbpBackgroundJobsAbstractionsModule ,第二齣則是位於 Volo.Abp.BackgroundJobs 項目的 AbpBackgroundJobsModule

AbpBackgroundJobsAbstractionsModule 的主要行爲是將符合條件的後臺做業,添加到 AbpBackgroundJobOptions 配置當中,以便後續進行使用。

public override void PreConfigureServices(ServiceConfigurationContext context)
{
    RegisterJobs(context.Services);
}

private static void RegisterJobs(IServiceCollection services)
{
    var jobTypes = new List<Type>();

    // 若是註冊的類型符合 IBackgroundJob<> 泛型,則添加到集合當中。
    services.OnRegistred(context =>
    {
        if (ReflectionHelper.IsAssignableToGenericType(context.ImplementationType, typeof(IBackgroundJob<>)))
        {
            jobTypes.Add(context.ImplementationType);
        }
    });

    services.Configure<AbpBackgroundJobOptions>(options =>
    {
        // 將數據賦值給配置類。
        foreach (var jobType in jobTypes)
        {
            options.AddJob(jobType);
        }
    });
}

Volo.Abp.BackgroundJobs 內部是 ABP vNext 爲咱們提供的 默認後臺做業管理器,這個後臺做業管理器 本質上是一個後臺工做者

這個後臺工做者會週期性(取決於 AbpBackgroundJobWorkerOptions.JobPollPeriod 值,默認爲 5 秒種)地從 IBackgroundJobStore 撈出一堆後臺任務,而且在後臺執行。至於每次執行多少個後臺任務,這也取決於 AbpBackgroundJobWorkerOptions.MaxJobFetchCount 的值,默認值是 1000 個。

注意:

這裏的 Options 類是 AbpBackgroundJobWorkerOptions,別和 AbpBackgroundWorkerOptions 混淆了。

因此在 AbpBackgroundJobsModule 模塊裏面,只作了一件事情,就是將負責後臺做業的後臺工做者,添加到後臺工做者管理器種,並開始週期性地執行。

public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
    var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundJobOptions>>().Value;
    if (options.IsJobExecutionEnabled)
    {
        // 得到後臺工做者管理器,並將負責後臺做業的工做者添加進去。
        context.ServiceProvider
            .GetRequiredService<IBackgroundWorkerManager>()
            .Add(context.ServiceProvider.GetRequiredService<IBackgroundJobWorker>()
            );
    }
}

後臺做業的定義

在上一節裏面看到,只要是實現 IBackgroundJob<TArgs> 類型的都視爲一個後臺做業。這個後臺做業接口,只定義了一個行爲,那就是執行(Execute(TArgs))。這裏的 TArgs 泛型做爲執行後臺做業時,須要傳遞的參數類型。

// 由於是傳入的參數,因此泛型參數是逆變的。
public interface IBackgroundJob<in TArgs>
{
    void Execute(TArgs args);
}

檢查源碼,發現 ABP vNext 的郵箱模塊定義了一個郵件發送任務 BackgroundEmailSendingJob,它的實現大概以下。

public class BackgroundEmailSendingJob : BackgroundJob<BackgroundEmailSendingJobArgs>, ITransientDependency
{
    // ...
    
    public override void Execute(BackgroundEmailSendingJobArgs args)
    {
        AsyncHelper.RunSync(() => EmailSender.SendAsync(args.To, args.Subject, args.Body, args.IsBodyHtml));
    }
}

後臺做業管理器

後臺做業都是經過一個後臺做業管理器(IBackgroundJobManager)進行管理的,這個接口定義了一個入隊方法(EnqueueAsync()),注意,咱們的後臺做業在入隊後,不是立刻執行的。

說一下這個入隊處理邏輯:

  1. 首先咱們會經過參數的類型,獲取到任務的名稱。(假設任務上面沒有標註 BackgroundJobNameAttribute 特性,那麼任務的名稱就是參數類型的 FullName 。)
  2. 構造一個 BackgroundJobInfo 對象。
  3. 經過 IBackgroundJobStore 持久化任務信息。
public virtual async Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null)
{
    // 獲取任務名稱。
    var jobName = BackgroundJobNameAttribute.GetName<TArgs>();
    var jobId = await EnqueueAsync(jobName, args, priority, delay);
    return jobId.ToString();
}

protected virtual async Task<Guid> EnqueueAsync(string jobName, object args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null)
{
    var jobInfo = new BackgroundJobInfo
    {
        Id = GuidGenerator.Create(),
        JobName = jobName,
        // 經過序列化器,序列化參數值,方便存儲。這裏內部其實使用的是 JSON.NET 進行序列化。
        JobArgs = Serializer.Serialize(args),
        Priority = priority,
        CreationTime = Clock.Now,
        NextTryTime = Clock.Now
    };

    // 若是任務有執行延遲,則任務的初始執行時間要加上這個延遲。
    if (delay.HasValue)
    {
        jobInfo.NextTryTime = Clock.Now.Add(delay.Value);
    }

    // 持久化任務信息,方便後面執行後臺做業的工做者可以取到。
    await Store.InsertAsync(jobInfo);

    return jobInfo.Id;
}

BackgroundJobNameAttribute 相關的方法:

public static string GetName<TJobArgs>()
{
    return GetName(typeof(TJobArgs));
}

public static string GetName([NotNull] Type jobArgsType)
{
    Check.NotNull(jobArgsType, nameof(jobArgsType));

    // 判斷參數類型上面是否標註了特性,而且特性實現了 IBackgroundJobNameProvider 接口。
    return jobArgsType
                .GetCustomAttributes(true)
                .OfType<IBackgroundJobNameProvider>()
                .FirstOrDefault()
                ?.Name
        // 拿不到名字,則使用類型的 FullName。
            ?? jobArgsType.FullName;
}

後臺做業的存儲

後臺做業的存儲默認是放在內存的,這點能夠從 InMemoryBackgroundJobStore 類型實現看出來。在它的內部使用了一個並行字典,經過做業的 Guid 與做業進行關聯綁定。

除了內存實現,在 Volo.Abp.BackgroundJobs.Domain 模塊還有一個 BackgroundJobStore 實現,基本套路與 SettingStore 同樣,都是存儲到數據庫裏面。

public class BackgroundJobStore : IBackgroundJobStore, ITransientDependency
{
    protected IBackgroundJobRepository BackgroundJobRepository { get; }

    // ... 
    
    public BackgroundJobInfo Find(Guid jobId)
    {
        return ObjectMapper.Map<BackgroundJobRecord, BackgroundJobInfo>(
            BackgroundJobRepository.Find(jobId)
        );
    }
    
    // ...

    public void Insert(BackgroundJobInfo jobInfo)
    {
        BackgroundJobRepository.Insert(
            ObjectMapper.Map<BackgroundJobInfo, BackgroundJobRecord>(jobInfo)
        );
    }

    // ...
}

後臺做業的執行

默認的後臺做業管理器是經過一個後臺工做者來執行後臺任務的,這個實現叫作 BackgroundJobWorker,這個後臺工做者的生命週期也是單例的。後臺做業的具體執行邏輯裏面,涉及到了如下幾個類型的交互。

類型 做用
AbpBackgroundJobOptions 提供每一個後臺任務的配置信息,包括任務的類型、參數類型、任務名稱數據。
AbpBackgroundJobWorkerOptions 提供後臺做業工做者的配置信息,例如每一個週期 最大執行的做業數量、後臺
工做者的 執行週期、做業執行 超時時間 等。
BackgroundJobConfiguration 後臺任務的配置信息,做用是將持久化存儲的做業信息與運行時類型進行綁定
和實例化,以便 ABP vNext 來執行具體的任務。
IBackgroundJobExecuter 後臺做業的執行器,當咱們從持久化存儲獲取到後臺做業信息時,將會經過
這個執行器來執行具體的後臺做業。
IBackgroundJobSerializer 後臺做業序列化器,用於後臺做業持久化時進行序列化的工具,默認採用的
是 JSON.NET 進行實現。
JobExecutionContext 執行器在執行後臺做業時,是經過這個上下文參數進行執行的,在這個上下
文內部,包含了後臺做業的具體類型、後臺做業的參數值。
IBackgroundJobStore 前面已經講過了,這個是用於後臺做業的持久化存儲,默認實現是存儲在內存。
BackgroundJobPriority 後臺做業的執行優先級定義,ABP vNext 在執行後臺任務時,會根據任務的優
先級進行排序,以便在後面執行的時候優先級高的任務先執行。

咱們來按照邏輯順序走一遍它的實現,首前後臺做業的執行工做者會從持久化存儲內,獲取 MaxJobFetchCount 個任務用於執行。從持久化存儲獲取後臺做業信息(BackgroundJobInfo),是由 IBackgroundJobStore 提供的。

var store = scope.ServiceProvider.GetRequiredService<IBackgroundJobStore>();

var waitingJobs = store.GetWaitingJobs(WorkerOptions.MaxJobFetchCount);

// 不存在任何後臺做業,則直接結束本次調用。
if (!waitingJobs.Any())
{
    return;
}

InMemoryBackgroundJobStore 的相關實現:

public List<BackgroundJobInfo> GetWaitingJobs(int maxResultCount)
{
    return _jobs.Values
        .Where(t => !t.IsAbandoned && t.NextTryTime <= Clock.Now)
        .OrderByDescending(t => t.Priority)
        .ThenBy(t => t.TryCount)
        .ThenBy(t => t.NextTryTime)
        .Take(maxResultCount)
        .ToList();
}

上面的代碼能夠看出來,首先排除 被放棄的任務 ,包含達到執行時間的任務,而後根據任務的優先級從高到低進行排序。重試次數少的優先執行,預計執行時間越早的越先執行。最後從這些數據中,篩選出 maxResultCount 結果並返回。

說到這裏,咱們來看一下這個 NextTryTime 是如何被計算出來的?回想起最開始的後臺做業管理器,咱們在添加一個後臺任務的時候,就會設置這個後臺任務的 預計執行時間。第一個任務被添加到執行隊列中時,它的值通常是 Clock.Now ,也就是它被添加到隊列的時間。

不過 ABP vNext 爲了讓那些常常執行失敗的任務,有比較低的優先級再執行,就在每次任務執行失敗以後,會將 NextTryTime 的值指數級進行增長。這塊代碼能夠在 CalculateNextTryTime 裏面看到,也就是說某個任務的執行 失敗次數越高,那麼它下一次的預期執行時間就會越遠。

protected virtual DateTime? CalculateNextTryTime(BackgroundJobInfo jobInfo, IClock clock)
{
    // 通常來講,這個 DefaultWaitFactor 因子的值是 2.0 。
    var nextWaitDuration = WorkerOptions.DefaultFirstWaitDuration * (Math.Pow(WorkerOptions.DefaultWaitFactor, jobInfo.TryCount - 1)); // 同執行失敗的次數進行掛鉤。
    var nextTryDate = jobInfo.LastTryTime?.AddSeconds(nextWaitDuration) ??
                        clock.Now.AddSeconds(nextWaitDuration);

    if (nextTryDate.Subtract(jobInfo.CreationTime).TotalSeconds > WorkerOptions.DefaultTimeout)
    {
        return null;
    }

    return nextTryDate;
}

當預期的執行時間都超過 DefaultTimeout 的超時時間時(默認爲 2 天),說明這個任務確實沒救了,就不要再執行了。

咱們以前說到,從 IBackgroundJobStore 拿到了須要執行的後臺任務信息集合,接下來咱們就要開始執行後臺任務了。

foreach (var jobInfo in waitingJobs)
{
    jobInfo.TryCount++;
    jobInfo.LastTryTime = clock.Now;

    try
    {
        // 根據任務名稱獲取任務的配置參數。
        var jobConfiguration = JobOptions.GetJob(jobInfo.JobName);
        // 根據配置裏面存儲的任務類型,將參數值進行反序列化。
        var jobArgs = serializer.Deserialize(jobInfo.JobArgs, jobConfiguration.ArgsType);
        // 構造一個新的執行上下文,讓執行器執行任務。
        var context = new JobExecutionContext(scope.ServiceProvider, jobConfiguration.JobType, jobArgs);

        try
        {
            jobExecuter.Execute(context);

            // 若是任務執行成功則刪除該任務。            
            store.Delete(jobInfo.Id);
        }
        catch (BackgroundJobExecutionException)
        {
            // 發生任務執行失敗異常時,根據指定的公式計算下一次的執行時間。
            var nextTryTime = CalculateNextTryTime(jobInfo, clock);

            if (nextTryTime.HasValue)
            {
                jobInfo.NextTryTime = nextTryTime.Value;
            }
            else
            {
                // 超過超時時間的時候,公式計算函數返回 null,該任務置爲廢棄任務。
                jobInfo.IsAbandoned = true;
            }

            TryUpdate(store, jobInfo);
        }
    }
    catch (Exception ex)
    {
        // 執行過程當中,產生了未知異常,設置爲廢棄任務,並打印日誌。
        Logger.LogException(ex);
        jobInfo.IsAbandoned = true;
        TryUpdate(store, jobInfo);
    }
}

執行後臺任務的時候基本分爲 5 步,它們分別是:

  1. 得到任務關聯的配置參數,默認不用提供,由於在以前模塊初始化的時候就已經配置了(你也能夠顯式指定)。
  2. 經過以前存儲的配置參數,將參數值反序列化出來,構造具體實例。
  3. 構造一個執行上下文。
  4. 後臺任務執行器執行具體的後臺任務。
  5. 成功則刪除任務,失敗則更新任務下次的執行狀態。

至於執行器裏面的真正執行操做,你都拿到了參數值和任務類型了。就能夠經過類型用 IoC 獲取後臺任務對象的實例,而後經過反射匹配方法簽名,在實例上調用這個方法傳入參數便可。

public virtual void Execute(JobExecutionContext context)
{
    // 構造具體的後臺做業實例對象。
    var job = context.ServiceProvider.GetService(context.JobType);
    if (job == null)
    {
        throw new AbpException("The job type is not registered to DI: " + context.JobType);
    }

    // 得到須要執行的方法簽名。
    var jobExecuteMethod = context.JobType.GetMethod(nameof(IBackgroundJob<object>.Execute));
    if (jobExecuteMethod == null)
    {
        throw new AbpException($"Given job type does not implement {typeof(IBackgroundJob<>).Name}. The job type was: " + context.JobType);
    }

    try
    {
        // 直接經過 MethodInfo 的 Invoke 方法調用,傳入具體的實例對象和參數值便可。
        jobExecuteMethod.Invoke(job, new[] { context.JobArgs });
    }
    catch (Exception ex)
    {
        Logger.LogException(ex);

        // 若是是執行方法內的異常,則包裝進行處理,而後拋出。
        throw new BackgroundJobExecutionException("A background job execution is failed. See inner exception for details.", ex)
        {
            JobType = context.JobType.AssemblyQualifiedName,
            JobArgs = context.JobArgs
        };
    }
}

集成 Hangfire

ABP vNext 對於 Hangfire 的集成代碼分佈在 Volo.Abp.HangFireVolo.Abp.BackgroundJobs.HangFire 模塊內部,前者是在模塊配置裏面,調用 Hangfire 庫的相關方法,注入組件到 IoC 容器當中。後者則是對後臺做業進行了適配處理,替換了默認的 IBackgroundJobManager 實現。

AbpHangfireModule 模塊內部,經過工廠建立出來一個 BackgroudJobServer 實例,並將它的生命週期與應用程序的生命週期進行綁定,以便進行銷燬處理。

public class AbpHangfireModule : AbpModule
{
    private BackgroundJobServer _backgroundJobServer;

    public override void ConfigureServices(ServiceConfigurationContext context)
    {
        context.Services.AddHangfire(configuration =>
        {
            context.Services.ExecutePreConfiguredActions(configuration);
        });
    }

    public override void OnApplicationInitialization(ApplicationInitializationContext context)
    {
        var options = context.ServiceProvider.GetRequiredService<IOptions<AbpHangfireOptions>>().Value;
        _backgroundJobServer = options.BackgroundJobServerFactory.Invoke(context.ServiceProvider);
    }

    public override void OnApplicationShutdown(ApplicationShutdownContext context)
    {
        //TODO: ABP may provide two methods for application shutdown: OnPreApplicationShutdown & OnApplicationShutdown
        _backgroundJobServer.SendStop();
        _backgroundJobServer.Dispose();
    }
}

咱們直奔主題,看一下基於 Hangfire 的後臺做業管理器是怎麼實現的。

public class HangfireBackgroundJobManager : IBackgroundJobManager, ITransientDependency
{
    public Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal,
        TimeSpan? delay = null)
    {
        // 若是沒有延遲參數,則直接經過 Enqueue() 方法扔進執行對了。
        if (!delay.HasValue)
        {
            return Task.FromResult(
                BackgroundJob.Enqueue<HangfireJobExecutionAdapter<TArgs>>(
                    adapter => adapter.Execute(args)
                )
            );
        }
        else
        {
            return Task.FromResult(
                BackgroundJob.Schedule<HangfireJobExecutionAdapter<TArgs>>(
                    adapter => adapter.Execute(args),
                    delay.Value
                )
            );
        }
    }

上述代碼中使用 HangfireJobExecutionAdapter 進行了一個適配操做,由於 Hangfire 要將一個後臺任務扔進隊列執行,不是用 TArgs 就能解決的。

轉到這個適配器定義,提供了一個 Execute(TArgs) 方法,當被添加到 Hangfire 隊列執行的時候。實際 Hangfire 會調用適配器的 Excetue(TArgs) 方法,而後內部仍是使用的 IBackgroundJobExecuter 來執行具體定義的任務。

public class HangfireJobExecutionAdapter<TArgs>
{
    protected AbpBackgroundJobOptions Options { get; }
    protected IServiceScopeFactory ServiceScopeFactory { get; }
    protected IBackgroundJobExecuter JobExecuter { get; }

    public HangfireJobExecutionAdapter(
        IOptions<AbpBackgroundJobOptions> options, 
        IBackgroundJobExecuter jobExecuter, 
        IServiceScopeFactory serviceScopeFactory)
    {
        JobExecuter = jobExecuter;
        ServiceScopeFactory = serviceScopeFactory;
        Options = options.Value;
    }

    public void Execute(TArgs args)
    {
        using (var scope = ServiceScopeFactory.CreateScope())
        {
            var jobType = Options.GetJob(typeof(TArgs)).JobType;
            var context = new JobExecutionContext(scope.ServiceProvider, jobType, args);
            JobExecuter.Execute(context);
        }
    }
}

集成 RabbitMQ

基於 RabbitMQ 的後臺做業實現,我想放在分佈式事件總線裏面,對其一塊兒進行講解。

3、總結

ABP vNext 爲咱們提供了多種後臺做業管理器的實現,你能夠根據本身的需求選用不一樣的後臺做業管理器,又或者是本身動手造輪子。

須要看其餘的 ABP vNext 相關文章?點擊我 便可跳轉到總目錄。

相關文章
相關標籤/搜索