最近因工做須要開發計劃任務模塊(嚴格來講應該是修改bug吧,其餘同事負責的)接觸到了Hangfire。早前聽同事說hangfire有點坑,懷着好奇,趁這兩天bug改的差很少了,在github上面down了hangfire源碼,下面分享一下,本身讀hangfire源碼的一些理解,和工做中須要注意的地方。介紹大概分爲如下幾個部分吧。1.準備工做,2.簡單使用,3.源碼分析,4.避坑。須要說明一下接觸hangfire源碼的時間不長,也就幾天時間理解不到位,或者說錯了的,但願在評論指正。
準備工做:hangfire源代碼的代碼量很少,github地址:
https://github.com/HangfireIO/Hangfire,有興趣的朋友能夠本身下載瞅瞅源碼。功能上大概能夠分爲客戶端模式和服務端模式。用到的技術大概有Multi Thread、Expression、Dapper、Cron等。能夠這麼說,它的定時任務徹底就是基於多線程協做實現的。由於是多線程環境,因此我的以爲看起來有點費力。
簡單使用:.Net&.Net Core環境均可以使用,下面就以.Net Core的使用爲例。
1.客戶端和服務端獨立部署
client端
1 public IServiceProvider ConfigureServices(IServiceCollection services) 2 { 3 // 其餘代碼 4 services.AddHangfire(config => 5 { 6 config.UseSqlServerStorage(...); 7 }); 8 } 9 10 public void Configure(IApplicationBuilder app, IWebHostEnvironment env) 11 { 12 // 其餘代碼... 13 // 啓用Dashboard看板 14 app.UseHangfireDashboard(); 15 }
server端
1 public void Configuration(IAppBuilder app) 2 { 3 GlobalConfiguration.Configuration 4 .UseSqlServerStorage("鏈接字符串", new SqlServerStorageOptions 5 { 6 // options 7 }); 8 app.UseHangfireServer(new BackgroundJobServerOptions 9 { 10 }); 11 } 12 13
或者
1 services.AddHangfireServer(options => 2 { 3 // 基於IHostedService接口實現 4 });
PS:server端還有一種實現方式,實現IHostedService接口 其實跟上面的使用方法同樣的,注入到服務就ok,在程序啓動階段會自動執行IHostedService接口的兩個方法,能夠簡單看下IHostedService接口的定義。
1 public interface IHostedService 2 { 3 Task StartAsync(CancellationToken cancellationToken); 4 Task StopAsync(CancellationToken cancellationToken); 5 }
接口就定義了兩個方法,start是在程序啓動的時候執行,固然stop就是在程序中止的時候執行。咱們用一張圖簡單描繪一下它的執行時機,圖是盜的。
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
以上就是hangfire的client端和server端分開部署的一個簡單應用,下面咱們看下第二種,client&server部署在同一臺機器上。
2.客戶端和服務端統一部署
1 public void Configuration(IAppBuilder app) 2 { 3 GlobalConfiguration.Configuration.UseSqlServerStorage(); // 配置數據庫鏈接 4 5 app.UseHangfireServer(); // 啓用server 6 app.UseHangfireDashboard(); // 啓用看板 7 }
簡單的幾行代碼,固然我也只會簡單的用法。以上服務注入並執行,接下來就是往hangfire裏面添加任務。
1 BackgroundJob.Enqueue(() => Console.WriteLine("Simple!")); // 當即執行 2 BackgroundJob.Schedule(() => Console.WriteLine("Reliable!"), TimeSpan.FromDays(7)); // 延後執行 3 RecurringJob.AddOrUpdate(() => Console.WriteLine("Transparent!"), Cron.Daily); // 循環執行,支持cron表達式
簡單使用就到這吧,咱們繼續大綱的第三部分,源碼分析。
源碼分析
客戶端模式就不用說了,說白了就是往hangfire數據庫裏面寫任務,咱們主要是看看服務端的執行原理。咱們先找到入口,也能夠看作是NetCore裏面的一箇中間件吧。看代碼
1 app.UseHangfireServer(); // 啓用server
UseHangfireServer實現
1 public static IAppBuilder UseHangfireServer( 2 [NotNull] this IAppBuilder builder, 3 [NotNull] JobStorage storage, 4 [NotNull] BackgroundJobServerOptions options, 5 [NotNull] params IBackgroundProcess[] additionalProcesses) 6 { 7 // 其餘代碼... 8 var server = new BackgroundJobServer(options, storage, additionalProcesses); 9 10 return builder; 11 }
UseHangfireServer擴展方法實現裏面,比較重要的一行代碼就是建立BackgroundJobServer,BackgroundJobServer實現了IBackgroundProcessingServer接口,server的啓動也就是間接在它的構造器裏面完成的。咱們不妨先瞅瞅IBackgroundProcessingServer接口和BackgroundJobServer類的定義。
1 // IBackgroundProcessingServer 2 public interface IBackgroundProcessingServer : IDisposable 3 { 4 void SendStop(); 5 bool WaitForShutdown(TimeSpan timeout); 6 Task WaitForShutdownAsync(CancellationToken cancellationToken); 7 } 8 9 // BackgroundJobServer 10 public class BackgroundJobServer : IBackgroundProcessingServer 11 { 12 // 其餘成員... 13 public BackgroundJobServer( 14 [NotNull] BackgroundJobServerOptions options, 15 [NotNull] JobStorage storage, 16 [NotNull] IEnumerable<IBackgroundProcess> additionalProcesses, 17 [CanBeNull] IJobFilterProvider filterProvider, 18 [CanBeNull] JobActivator activator, 19 [CanBeNull] IBackgroundJobFactory factory, 20 [CanBeNull] IBackgroundJobPerformer performer, 21 [CanBeNull] IBackgroundJobStateChanger stateChanger) 22 { 23 // 其餘代碼 24 var processes = new List<IBackgroundProcessDispatcherBuilder>(); 25 processes.AddRange(GetRequiredProcesses(filterProvider, activator, factory, performer, stateChanger)); 26 processes.AddRange(additionalProcesses.Select(x => x.UseBackgroundPool(1))); 27 var properties = new Dictionary<string, object> 28 { 29 { "Queues", options.Queues }, 30 { "WorkerCount", options.WorkerCount } 31 }; 32 33 _processingServer = new BackgroundProcessingServer( 34 storage, 35 processes, 36 properties, 37 GetProcessingServerOptions()); 38 } 39 public void SendStop() 40 { 41 } 42 public void Dispose() 43 { 44 } 45 [Obsolete("This method is a stub. There is no need to call the `Start` method. Will be removed in version 2.0.0.")] 46 public void Start() 47 { 48 } 49 [Obsolete("Please call the `Shutdown` method instead. Will be removed in version 2.0.0.")] 50 public void Stop() 51 { 52 } 53 [Obsolete("Please call the `Shutdown` method instead. Will be removed in version 2.0.0.")] 54 public void Stop(bool force) 55 { 56 } 57 public bool WaitForShutdown(TimeSpan timeout) 58 { 59 } 60 public Task WaitForShutdownAsync(CancellationToken cancellationToken) 61 { 62 }
IBackgroundProcessingServer接口裏面的這幾個方法都是跟停用server,取消任務清理資源相關的。BackgroundJobServer類裏面真正完成接口的實現是由BackgroundProcessingServer類型的同名函數實現,這個對象是在構造函數裏面初始化的,在初始化BackgroundProcessingServer類型的同時,建立了若干IBackgroundProcessDispatcherBuilder實現類BackgroundProcessDispatcherBuilder的實例,hangfire默認實現了7種dispatcher,咱們任務、日誌、心跳等等獨立線程都是由它的Create方法完成,這個地方不算server啓動主線,會在後面細說。咱們繼續看看BackgroundProcessingServer這個類型。這裏須要注意的是裏面有幾個方法好像是被停用了,start、stop等方法,官方也註釋了,被刪除了。start方法被停用了,難道咱們的server啓動是在BackgroundProcessingServer類型裏面?繼續看BackgroundProcessingServer的定義。
1 public sealed class BackgroundProcessingServer : IBackgroundProcessingServer 2 { 3 // 其餘成員 4 internal BackgroundProcessingServer( 5 [NotNull] BackgroundServerProcess process, 6 [NotNull] BackgroundProcessingServerOptions options) 7 { 8 _process = process ?? throw new ArgumentNullException(nameof(process)); 9 _options = options ?? throw new ArgumentNullException(nameof(options)); 10 _dispatcher = CreateDispatcher(); 11 #if !NETSTANDARD1_3 12 AppDomain.CurrentDomain.DomainUnload += OnCurrentDomainUnload; 13 AppDomain.CurrentDomain.ProcessExit += OnCurrentDomainUnload; 14 #endif 15 } 16 public void SendStop() 17 { 18 } 19 public bool WaitForShutdown(TimeSpan timeout) 20 { 21 } 22 public async Task WaitForShutdownAsync(CancellationToken cancellationToken) 23 { 24 } 25 public void Dispose() 26 { 27 28 } 29 private void OnCurrentDomainUnload(object sender, EventArgs args) 30 { 31 32 } 33 private IBackgroundDispatcher CreateDispatcher() 34 { 35 var execution = new BackgroundExecution( 36 _stoppingCts.Token, 37 new BackgroundExecutionOptions 38 { 39 Name = nameof(BackgroundServerProcess), 40 ErrorThreshold = TimeSpan.Zero, 41 StillErrorThreshold = TimeSpan.Zero, 42 RetryDelay = retry => _options.RestartDelay 43 }); 44 return new BackgroundDispatcher( 45 execution, 46 RunServer, 47 execution, 48 ThreadFactory); 49 } 50 private void RunServer(Guid executionId, object state) 51 { 52 _process.Execute(executionId, (BackgroundExecution)state, _stoppingCts.Token, _stoppedCts.Token, _shutdownCts.Token); 53 } 54 private static IEnumerable<Thread> ThreadFactory(ThreadStart threadStart) 55 { 56 yield return new Thread(threadStart) 57 { 58 IsBackground = true, 59 Name = $"{nameof(BackgroundServerProcess)} #{Interlocked.Increment(ref _lastThreadId)}", 60 }; 61 } 62 }
果不其然,server的啓動快要揭開神祕的面紗了,RunServer?翻譯過來應該是啓動服務吧,咱們暫且不去管他,先記一下這個有個runserver,咱們繼續跟蹤。在構造函數裏面調用了一個CreateDispatcher()的方法,咱們看下它的實現
1 private IBackgroundDispatcher CreateDispatcher() 2 { 3 var execution = new BackgroundExecution( 4 _stoppingCts.Token, 5 new BackgroundExecutionOptions 6 { 7 Name = nameof(BackgroundServerProcess), 8 ErrorThreshold = TimeSpan.Zero, 9 StillErrorThreshold = TimeSpan.Zero, 10 RetryDelay = retry => _options.RestartDelay 11 }); 12 return new BackgroundDispatcher( 13 execution, 14 RunServer, 15 execution, 16 ThreadFactory); 17 }
在CreateDispatcher方法裏面返回了一個BackgroundDispatcher,字面意思好像是後臺分發器,而且指定了回調runserver,BackgroundDispatcher實現了IBackgroundDispatcher接口,咱們先看下它們的定義。
1 // IBackgroundDispatcher 2 public interface IBackgroundDispatcher : IDisposable 3 { 4 bool Wait(TimeSpan timeout); 5 Task WaitAsync(TimeSpan timeout, CancellationToken cancellationToken); 6 } 7 8 // BackgroundDispatcher 9 internal sealed class BackgroundDispatcher : IBackgroundDispatcher 10 { 11 // 其餘成員 12 public BackgroundDispatcher( 13 [NotNull] IBackgroundExecution execution, 14 [NotNull] Action<Guid, object> action, 15 [CanBeNull] object state, 16 [NotNull] Func<ThreadStart, IEnumerable<Thread>> threadFactory) 17 { 18 if (threadFactory == null) throw new ArgumentNullException(nameof(threadFactory)); 19 _execution = execution ?? throw new ArgumentNullException(nameof(execution)); 20 _action = action ?? throw new ArgumentNullException(nameof(action)); 21 _state = state; 22 #if !NETSTANDARD1_3 23 AppDomainUnloadMonitor.EnsureInitialized(); 24 #endif 25 var threads = threadFactory(DispatchLoop)?.ToArray(); 26 if (threads == null || threads.Length == 0) 27 { 28 throw new ArgumentException("At least one unstarted thread should be created.", nameof(threadFactory)); 29 } 30 if (threads.Any(thread => thread == null || (thread.ThreadState & ThreadState.Unstarted) == 0)) 31 { 32 throw new ArgumentException("All the threads should be non-null and in the ThreadState.Unstarted state.", nameof(threadFactory)); 33 } 34 _stopped = new CountdownEvent(threads.Length); 35 foreach (var thread in threads) 36 { 37 thread.Start(); 38 } 39 } 40 public bool Wait(TimeSpan timeout) 41 { 42 return _stopped.WaitHandle.WaitOne(timeout); 43 } 44 public async Task WaitAsync(TimeSpan timeout, CancellationToken cancellationToken) 45 { 46 await _stopped.WaitHandle.WaitOneAsync(timeout, cancellationToken).ConfigureAwait(false); 47 } 48 public void Dispose() 49 { 50 } 51 public override string ToString() 52 { 53 } 54 private void DispatchLoop() 55 { 56 try 57 { 58 _execution.Run(_action, _state); 59 } 60 catch (Exception ex) 61 { 62 63 } 64 finally 65 { 66 try 67 { 68 _stopped.Signal(); 69 } 70 catch (ObjectDisposedException) 71 { 72 73 } 74 } 75 } 76 }
從IBackgroundDispatcher接口的定義來看,分發器應該是負責協調資源處理,咱們具體看看BackgroundDispatcher的實現。以上代碼就是server的啓動執行核心代碼而且我以加粗,其實就是啓動線程Loop執行。在DispatchLoop方法裏面間接調用了我上面說的runserver方法。在runserver方法裏面實現了整個server端的初始化工做。咱們接着看DispatchLoop方法的實現 ,在這個方法裏面調用了IBackgroundExecution接口的run方法,繼續IBackgroundExecution接口的定義。
1 public interface IBackgroundExecution : IDisposable 2 { 3 void Run([NotNull] Action<Guid, object> callback, [CanBeNull] object state); 4 Task RunAsync([NotNull] Func<Guid, object, Task> callback, [CanBeNull] object state); 5 }
就兩方法,run包含同步和異步,看看它的惟一實現類BackgroundExecution。
1 internal sealed class BackgroundExecution : IBackgroundExecution 2 { 3 // 其餘成員 4 public void Run(Action<Guid, object> callback, object state) 5 { 6 if (callback == null) throw new ArgumentNullException(nameof(callback)); 7 var executionId = Guid.NewGuid(); 8 9 { 10 #if !NETSTANDARD1_3 11 try 12 #endif 13 { 14 HandleStarted(executionId, out var nextDelay); 15 while (true) 16 { 17 // Don't place anything here. 18 try 19 { 20 21 if (StopRequested) break; 22 if (nextDelay > TimeSpan.Zero) 23 { 24 HandleDelay(executionId, nextDelay); 25 } 26 callback(executionId, state); 27 HandleSuccess(out nextDelay); 28 } 29 #if !NETSTANDARD1_3 30 catch (ThreadAbortException) when (AppDomainUnloadMonitor.IsUnloading) 31 { 32 // Our thread is aborted due to AppDomain unload. It's better to give up to 33 // not to cause the host to be more aggressive. 34 throw; 35 } 36 #endif 37 catch (OperationCanceledException) when (StopRequested) 38 { 39 break; 40 } 41 catch (Exception ex) 42 { 43 HandleException(executionId, ex, out nextDelay); 44 } 45 } 46 HandleStop(executionId); 47 } 48 #if !NETSTANDARD1_3 49 catch (ThreadAbortException ex) 50 { 51 HandleThreadAbort(executionId, ex); 52 } 53 #endif 54 } 55 } 56 }
hangfire裏面全部的獨立線程都是經過run方法執行,而後回調到本身的實現類Execute方法,自此每一個獨立的功能線程就循環幹着本身獨立的工做(這個後面會單獨分析RecurringJobScheduler)。繼續咱們的主線,server啓動,咱們以run的同步方法爲例,第一個線程(咱們就叫它主線程吧)啓動了一個while循環,在循環裏面而且callback調用了咱們的runserver方法。
1 private void RunServer(Guid executionId, object state) 2 { 3 _process.Execute(executionId, (BackgroundExecution)state, _stoppingCts.Token, _stoppedCts.Token, _shutdownCts.Token); 4 }
在runserver方法裏面的實現很簡單,直接調用了_process的execute方法,咱們簡單看下_process類型IBackgroundServerProcess的定義。
1 internal interface IBackgroundServerProcess 2 { 3 void Execute( 4 Guid executionId, 5 BackgroundExecution execution, 6 CancellationToken stoppingToken, 7 CancellationToken stoppedToken, 8 CancellationToken shutdownToken); 9 }
IBackgroundServerProcess的定義就一個execute方法,這個接口的工做其實就是初始化server服務端,咱們看看它的惟一實現類BackgroundServerProcess。
1 internal sealed class BackgroundServerProcess : IBackgroundServerProcess 2 { 3 4 // 其餘成員 5 public BackgroundServerProcess( 6 [NotNull] JobStorage storage, 7 [NotNull] IEnumerable<IBackgroundProcessDispatcherBuilder> dispatcherBuilders, 8 [NotNull] BackgroundProcessingServerOptions options, 9 [NotNull] IDictionary<string, object> properties) 10 { 11 if (dispatcherBuilders == null) throw new ArgumentNullException(nameof(dispatcherBuilders)); 12 13 14 _storage = storage ?? throw new ArgumentNullException(nameof(storage)); 15 _options = options ?? throw new ArgumentNullException(nameof(options)); 16 _properties = properties ?? throw new ArgumentNullException(nameof(properties)); 17 18 19 var builders = new List<IBackgroundProcessDispatcherBuilder>(); 20 builders.AddRange(GetRequiredProcesses()); // 添加默認的工做dispatcher也就是獨立線程 21 builders.AddRange(GetStorageComponents()); 22 builders.AddRange(dispatcherBuilders); 23 24 25 _dispatcherBuilders = builders.ToArray(); 26 } 27 28 29 public void Execute(Guid executionId, BackgroundExecution execution, CancellationToken stoppingToken, 30 CancellationToken stoppedToken, CancellationToken shutdownToken) // server初始化 31 { 32 var serverId = GetServerId(); 33 Stopwatch stoppedAt = null; 34 35 36 void HandleRestartSignal() 37 { 38 if (!stoppingToken.IsCancellationRequested) 39 { 40 _logger.Info($"{GetServerTemplate(serverId)} caught restart signal..."); 41 } 42 } 43 using (var restartCts = new CancellationTokenSource()) 44 using (var restartStoppingCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, restartCts.Token)) 45 using (var restartStoppedCts = CancellationTokenSource.CreateLinkedTokenSource(stoppedToken, restartCts.Token)) 46 using (var restartShutdownCts = CancellationTokenSource.CreateLinkedTokenSource(shutdownToken, restartCts.Token)) 47 using (restartStoppingCts.Token.Register(HandleStopRestartSignal)) 48 using (stoppingToken.Register(HandleStoppingSignal)) 49 using (stoppedToken.Register(HandleStoppedSignal)) 50 using (shutdownToken.Register(HandleShutdownSignal)) 51 using (restartCts.Token.Register(HandleRestartSignal)) 52 { 53 var context = new BackgroundServerContext( 54 serverId, 55 _storage, 56 _properties, 57 restartStoppingCts.Token, 58 restartStoppedCts.Token, 59 restartShutdownCts.Token); 60 var dispatchers = new List<IBackgroundDispatcher>(); 61 CreateServer(context); 62 try 63 { 64 // ReSharper disable once AccessToDisposedClosure 65 using (var heartbeat = CreateHeartbeatProcess(context, () => restartCts.Cancel())) // 建立守護線程 66 { 67 StartDispatchers(context, dispatchers); // 啓動hangfire默認初始化的全部獨立任務線程 68 execution.NotifySucceeded(); 69 WaitForDispatchers(context, dispatchers); 70 71 72 restartCts.Cancel(); 73 74 heartbeat.WaitAsync(Timeout.InfiniteTimeSpan, shutdownToken).GetAwaiter().GetResult(); 75 } 76 } 77 finally 78 { 79 DisposeDispatchers(dispatchers); 80 ServerDelete(context, stoppedAt); 81 } 82 } 83 } 84 85 86 private IBackgroundDispatcher CreateHeartbeatProcess(BackgroundServerContext context, Action requestRestart) // 建立守護線程 87 { 88 return new ServerHeartbeatProcess(_options.HeartbeatInterval, _options.ServerTimeout, requestRestart) 89 .UseBackgroundPool(threadCount: 1) 90 .Create(context, _options); 91 } 92 93 94 private IEnumerable<IBackgroundProcessDispatcherBuilder> GetRequiredProcesses() // 初始化日誌和任務監控線程 95 { 96 yield return new ServerWatchdog(_options.ServerCheckInterval, _options.ServerTimeout).UseBackgroundPool(threadCount: 1); 97 yield return new ServerJobCancellationWatcher(_options.CancellationCheckInterval).UseBackgroundPool(threadCount: 1); 98 } 99 private string GetServerId() // 獲取serverid 100 { 101 var serverName = _options.ServerName 102 ?? Environment.GetEnvironmentVariable("COMPUTERNAME") 103 ?? Environment.GetEnvironmentVariable("HOSTNAME"); 104 var guid = Guid.NewGuid().ToString(); 105 106 return !String.IsNullOrWhiteSpace(serverName) ? $"{serverName.ToLowerInvariant()}:{guid}" : guid; 107 } 108 109 110 private void CreateServer(BackgroundServerContext context) // 建立server,寫入Server數據表 111 { 112 var stopwatch = Stopwatch.StartNew(); 113 using (var connection = _storage.GetConnection()) 114 { 115 connection.AnnounceServer(context.ServerId, GetServerContext(_properties)); 116 } 117 stopwatch.Stop(); 118 119 120 ServerJobCancellationToken.AddServer(context.ServerId); 121 _logger.Info($"{GetServerTemplate(context.ServerId)} successfully announced in {stopwatch.Elapsed.TotalMilliseconds} ms"); 122 } 123 124 125 private void StartDispatchers(BackgroundServerContext context, ICollection<IBackgroundDispatcher> dispatchers) // 啓動全部獨立的任務線程,包括咱們的隊列計劃、循環計劃、日誌、守護等等線程 126 { 127 128 foreach (var dispatcherBuilder in _dispatcherBuilders) 129 { 130 dispatchers.Add(dispatcherBuilder.Create(context, _options)); 131 } 132 } 133 134 }
以上代碼我有作精簡處理,不要糾結裏面的實現,代碼註釋也比較詳細。下面我作一個簡單的總結吧,第一個線程(暫時叫主線程吧)從startup裏面調用usehangfireserver擴展方法-》啓動一個新的worker線程用於初始化&啓動server-》主程返回-》啓動hangfire全部任務線程-》建立的第一個worker線程掛起(用於處理全部任務線程的資源釋放)。server的初始化工做大概就是這些,下面詳細看看hangfire的任務線程的執行原理,這裏咱們以RecurringJobScheduler循環任務爲例。
RecurringJobScheduler實現機制
還記得上面提到的7個dispatcher任務線程的建立嗎?這7個默認的任務線程初始化就發生在上面加粗的代碼裏面StartDispatchers方法,咱們看代碼。
1 private void StartDispatchers(BackgroundServerContext context, ICollection<IBackgroundDispatcher> dispatchers) 2 { 3 // 其餘代碼... 4 foreach (var dispatcherBuilder in _dispatcherBuilders) 5 { 6 dispatchers.Add(dispatcherBuilder.Create(context, _options)); // 初始化獨立任務線程 7 } 8 }
遍歷_dispatcherBuilders數組,7種任務類型,分別調用它們的Create方法。繼續看create方法。
1 public IBackgroundDispatcher Create(BackgroundServerContext context, BackgroundProcessingServerOptions options) // 第一步 2 { 3 // 其餘代碼 4 var execution = new BackgroundExecution( 5 context.StoppingToken, 6 new BackgroundExecutionOptions 7 { 8 Name = _process.GetType().Name, 9 RetryDelay = options.RetryDelay 10 }); // 定義本身的execution 11 return new BackgroundDispatcher( // 建立BackgroundDispatcher 12 execution, 13 ExecuteProcess, // 指定回調 14 Tuple.Create(_process, context, execution), // 建立三元組上下文,注意一下1元組這個對象 15 _threadFactory); 16 } 17 18 public BackgroundDispatcher( // 第二步 19 [NotNull] IBackgroundExecution execution, 20 [NotNull] Action<Guid, object> action, 21 [CanBeNull] object state, 22 [NotNull] Func<ThreadStart, IEnumerable<Thread>> threadFactory) 23 { 24 25 _state = state; 26 27 var threads = threadFactory(DispatchLoop)?.ToArray(); 28 29 foreach (var thread in threads) 30 { 31 thread.Start(); // 執行線程 32 } 33 } 34 35 private void DispatchLoop() // 第三步 36 { 37 try 38 { 39 _execution.Run(_action, _state); // 在run裏面回調_action 40 } 41 catch (Exception ex) 42 { 43 } 44 finally 45 { 46 try 47 { 48 _stopped.Signal(); 49 } 50 catch (ObjectDisposedException) 51 { 52 } 53 } 54 } 55 56 private static void ExecuteProcess(Guid executionId, object state) // 第四步 回調方法,對應上面的指定回調 57 { 58 var tuple = (Tuple<IBackgroundProcess, BackgroundServerContext, BackgroundExecution>)state; 59 var serverContext = tuple.Item2; 60 var context = new BackgroundProcessContext( // 建立公共上下文 61 serverContext.ServerId, 62 serverContext.Storage, 63 serverContext.Properties.ToDictionary(x => x.Key, x => x.Value), 64 executionId, 65 serverContext.StoppingToken, 66 serverContext.StoppedToken, 67 serverContext.ShutdownToken); 68 while (!context.IsStopping) 69 { 70 tuple.Item1.Execute(context); // 執行本身元組對應的實例 71 tuple.Item3.NotifySucceeded(); 72 } 73 }
上面有點亂啊,我大概簡單串起來講一下。第一步在create方法裏面建立了BackgroundDispatcher並指定了元組參數-》第二步綁定線程的執行函數Loop而且執行-》第三步執行Loop而且回調_action委託-》第四步_action參數對應的函數地址就是ExecuteProcess,最後在ExecuteProcess裏面經過元組參數調用對應的任務類型,自此7種任務類型啓動並開始工做。以上代碼還有個細節須要說明一下,Tuple.Create(_process, context, execution)。元組的第一個參數,其類型爲IBackgroundProcess,看下定義。
1 public interface IBackgroundProcess : IServerProcess 2 { 3 void Execute([NotNull] BackgroundProcessContext context); 4 }
接口就定義了一個方法,沒什麼特別的,可是它的幾個實現類就是咱們單獨的任務類,咱們下面要說的RecurringJobScheduler循環任務類也實現了這個接口。到此咱們的RecurringJobScheduler循環定時任務線程就算開始執行了。
RecurringJobScheduler循環定時任務機制
照舊看下這個類型的定義
1 public class RecurringJobScheduler : IBackgroundProcess 2 { 3 // 其餘代碼 4 public RecurringJobScheduler( 5 [NotNull] IBackgroundJobFactory factory, 6 TimeSpan pollingDelay, 7 [NotNull] ITimeZoneResolver timeZoneResolver, 8 [NotNull] Func<DateTime> nowFactory) 9 { 10 if (factory == null) throw new ArgumentNullException(nameof(factory)); 11 if (nowFactory == null) throw new ArgumentNullException(nameof(nowFactory)); 12 if (timeZoneResolver == null) throw new ArgumentNullException(nameof(timeZoneResolver)); 13 14 15 _factory = factory; 16 _nowFactory = nowFactory; 17 _timeZoneResolver = timeZoneResolver; 18 _pollingDelay = pollingDelay; 19 _profiler = new SlowLogProfiler(_logger); 20 } 21 22 23 /// <inheritdoc /> 24 public void Execute(BackgroundProcessContext context) // 實現方法 25 { 26 if (context == null) throw new ArgumentNullException(nameof(context)); 27 28 29 var jobsEnqueued = 0; 30 31 32 while (EnqueueNextRecurringJobs(context)) // 從數據庫獲取定時任務 33 { 34 jobsEnqueued++; 35 36 37 if (context.IsStopping) 38 { 39 break; 40 } 41 } 42 43 44 if (jobsEnqueued != 0) 45 { 46 _logger.Debug($"{jobsEnqueued} recurring job(s) enqueued."); 47 } 48 49 50 if (_pollingDelay > TimeSpan.Zero) 51 { 52 context.Wait(_pollingDelay); 53 } 54 else 55 { 56 var now = _nowFactory(); 57 context.Wait(now.AddMilliseconds(-now.Millisecond).AddSeconds(-now.Second).AddMinutes(1) - now); 58 } 59 } 60 }
承上,調用元組的第一個參數的execute方法,RecurringJobScheduler的execute方法得以執行,該方法就幹一件事,每隔15秒從數據庫獲取待執行的計劃,每次1000條數據。經過EnqueueNextRecurringJobs方法獲取任務。
1 private bool EnqueueNextRecurringJobs(BackgroundProcessContext context) 2 { 3 return UseConnectionDistributedLock(context.Storage, connection => 4 { 5 var result = false; 6 if (IsBatchingAvailable(connection)) 7 { 8 var now = _nowFactory(); 9 var timestamp = JobHelper.ToTimestamp(now); 10 var recurringJobIds = ((JobStorageConnection)connection).GetFirstByLowestScoreFromSet("recurring-jobs", 0, timestamp, BatchSize); // 從數據庫裏面查詢 11 if (recurringJobIds == null || recurringJobIds.Count == 0) return false; 12 foreach (var recurringJobId in recurringJobIds) 13 { 14 if (context.IsStopping) return false; 15 if (TryEnqueueBackgroundJob(context, connection, recurringJobId, now))// 排隊執行 16 { 17 result = true; 18 } 19 } 20 } 21 else 22 { 23 for (var i = 0; i < BatchSize; i++) 24 { 25 if (context.IsStopping) return false; 26 var now = _nowFactory(); 27 var timestamp = JobHelper.ToTimestamp(now); 28 var recurringJobId = connection.GetFirstByLowestScoreFromSet("recurring-jobs", 0, timestamp); 29 if (recurringJobId == null) return false; 30 if (!TryEnqueueBackgroundJob(context, connection, recurringJobId, now)) 31 { 32 return false; 33 } 34 } 35 } 36 return result; 37 }); 38 }
GetFirstByLowestScoreFromSet方法從數據庫Set表裏面查詢top1000數據,條件是key爲recurring-jobs字符串(表示定時任務)而且 時間範圍是0到當前時間。隨後遍歷這些jobids,排隊執行,往下看TryEnqueueBackgroundJob方法的實現。
1 private bool EnqueueBackgroundJob( 2 BackgroundProcessContext context, 3 IStorageConnection connection, 4 string recurringJobId, 5 DateTime now) 6 { 7 // 其餘代碼 8 using (connection.AcquireDistributedRecurringJobLock(recurringJobId, LockTimeout)) 9 { 10 try 11 { 12 var recurringJob = connection.GetRecurringJob(recurringJobId, _timeZoneResolver, now); 13 if (recurringJob == null) 14 { 15 using (var transaction = connection.CreateWriteTransaction()) 16 { 17 transaction.RemoveFromSet("recurring-jobs", recurringJobId); 18 transaction.Commit(); 19 } 20 return false; 21 } 22 23 BackgroundJob backgroundJob = null; 24 IReadOnlyDictionary<string, string> changedFields; 25 if (recurringJob.TrySchedule(out var nextExecution, out var error)) 26 { 27 if (nextExecution.HasValue && nextExecution <= now) 28 { 29 backgroundJob = _factory.TriggerRecurringJob(context.Storage, connection, _profiler, recurringJob, now); 30 if (String.IsNullOrEmpty(backgroundJob?.Id)) 31 { 32 _logger.Debug($"Recurring job '{recurringJobId}' execution at '{nextExecution}' has been canceled."); 33 } 34 } 35 recurringJob.IsChanged(out changedFields, out nextExecution); 36 } 37 else if (recurringJob.RetryAttempt < MaxRetryAttemptCount) 38 { 39 var delay = _pollingDelay > TimeSpan.Zero ? _pollingDelay : TimeSpan.FromMinutes(1); 40 41 _logger.WarnException($"Recurring job '{recurringJobId}' can't be scheduled due to an error and will be retried in {delay}.", error); 42 recurringJob.ScheduleRetry(delay, out changedFields, out nextExecution); 43 } 44 else 45 { 46 _logger.ErrorException($"Recurring job '{recurringJobId}' can't be scheduled due to an error and will be disabled.", error); 47 recurringJob.Disable(error, out changedFields, out nextExecution); 48 } 49 50 using (var transaction = connection.CreateWriteTransaction()) 51 { 52 if (backgroundJob != null) 53 { 54 _factory.StateMachine.EnqueueBackgroundJob( 55 context.Storage, 56 connection, 57 transaction, 58 recurringJob, 59 backgroundJob, 60 "Triggered by recurring job scheduler", 61 _profiler); 62 } 63 transaction.UpdateRecurringJob(recurringJob, changedFields, nextExecution, _logger); 64 transaction.Commit(); 65 return true; 66 } 67 } 68 catch (TimeZoneNotFoundException ex) 69 { 70 catch (Exception ex) 71 { 72 73 } 74 return false; 75 } 76 }
須要注意的地方我都有加粗,該方法大概流程是:1.GetRecurringJob根據jobid從Hash表裏面查詢一條完整的定時任務,2.TrySchedule獲取該任務的下次執行時間,若是下次執行時間小於當前,執行這條任務(並不是真正執行定時任務,只是往job表裏面寫數據,真正執行任務由worker完成),3.獲取下次執行時間&全部任務字段,4.狀態機修改任務狀態。定時任務就這樣周而復始的重複執行以上流程。這裏簡單說下worker的執行機制,其實際就是輪詢檢索job表裏面的數據執行任務表達式樹,worker在hangfire裏面默認開啓了20個線程。第三部分就到這吧。
避坑
簡單說下我的在改bug期間遇到的一些問題啊。
1.時區問題,在添加定時任務時若是不指定時區信息,默認使用的是utc時間,咱們中國是東8區,也就是說解析出來的執行時間會晚8個小時執行。解決辦法有幾種能夠經過全局指定options的ITimeZoneResolver屬性指定,也能夠經過AddorUpdate方法指定,若是是指定時區信息,須要注意看板上面的異常信息,若是有異常會致使任務不執行,時區信息它是從系統裏面檢索出來的,沒有就拋異常。就這樣吧。