說下hangfire吧

最近因工做須要開發計劃任務模塊(嚴格來講應該是修改bug吧,其餘同事負責的)接觸到了Hangfire。早前聽同事說hangfire有點坑,懷着好奇,趁這兩天bug改的差很少了,在github上面down了hangfire源碼,下面分享一下,本身讀hangfire源碼的一些理解,和工做中須要注意的地方。介紹大概分爲如下幾個部分吧。1.準備工做,2.簡單使用,3.源碼分析,4.避坑。須要說明一下接觸hangfire源碼的時間不長,也就幾天時間理解不到位,或者說錯了的,但願在評論指正。
準備工做:hangfire源代碼的代碼量很少,github地址: https://github.com/HangfireIO/Hangfire,有興趣的朋友能夠本身下載瞅瞅源碼。功能上大概能夠分爲客戶端模式和服務端模式。用到的技術大概有Multi Thread、Expression、Dapper、Cron等。能夠這麼說,它的定時任務徹底就是基於多線程協做實現的。由於是多線程環境,因此我的以爲看起來有點費力。
簡單使用:.Net&.Net Core環境均可以使用,下面就以.Net Core的使用爲例。
1.客戶端和服務端獨立部署
client端
 1 public IServiceProvider ConfigureServices(IServiceCollection services)  2  {  3             // 其餘代碼
 4              services.AddHangfire(config =>
 5  {  6  config.UseSqlServerStorage(...);  7  });  8  }  9  
10 public void Configure(IApplicationBuilder app, IWebHostEnvironment env) 11  { 12             // 其餘代碼... 13             // 啓用Dashboard看板
14  app.UseHangfireDashboard(); 15         }

 

server端
 1 public void Configuration(IAppBuilder app)  2  {  3  GlobalConfiguration.Configuration  4                  .UseSqlServerStorage("鏈接字符串", new SqlServerStorageOptions  5  {  6                     // options
 7  });  8             app.UseHangfireServer(new BackgroundJobServerOptions  9  { 10  }); 11  } 12  
13
或者
1 services.AddHangfireServer(options =>
2  { 3                         // 基於IHostedService接口實現
4                     });
PS:server端還有一種實現方式,實現IHostedService接口 其實跟上面的使用方法同樣的,注入到服務就ok,在程序啓動階段會自動執行IHostedService接口的兩個方法,能夠簡單看下IHostedService接口的定義。
1   public interface IHostedService 2  { 3  Task StartAsync(CancellationToken cancellationToken); 4  Task StopAsync(CancellationToken cancellationToken); 5   }
接口就定義了兩個方法,start是在程序啓動的時候執行,固然stop就是在程序中止的時候執行。咱們用一張圖簡單描繪一下它的執行時機,圖是盜的。
以上就是hangfire的client端和server端分開部署的一個簡單應用,下面咱們看下第二種,client&server部署在同一臺機器上。
2.客戶端和服務端統一部署
1 public void Configuration(IAppBuilder app) 2 { 3     GlobalConfiguration.Configuration.UseSqlServerStorage(); // 配置數據庫鏈接
4     
5     app.UseHangfireServer(); // 啓用server
6     app.UseHangfireDashboard(); // 啓用看板
7 }

 

簡單的幾行代碼,固然我也只會簡單的用法。以上服務注入並執行,接下來就是往hangfire裏面添加任務。
1 BackgroundJob.Enqueue(() => Console.WriteLine("Simple!")); // 當即執行
2 BackgroundJob.Schedule(() => Console.WriteLine("Reliable!"), TimeSpan.FromDays(7)); // 延後執行
3 RecurringJob.AddOrUpdate(() => Console.WriteLine("Transparent!"), Cron.Daily); // 循環執行,支持cron表達式
簡單使用就到這吧,咱們繼續大綱的第三部分,源碼分析。
 
源碼分析
客戶端模式就不用說了,說白了就是往hangfire數據庫裏面寫任務,咱們主要是看看服務端的執行原理。咱們先找到入口,也能夠看作是NetCore裏面的一箇中間件吧。看代碼
1 app.UseHangfireServer(); // 啓用server
UseHangfireServer實現
 1 public static IAppBuilder UseHangfireServer(  2             [NotNull] this IAppBuilder builder,  3  [NotNull] JobStorage storage,  4  [NotNull] BackgroundJobServerOptions options,  5             [NotNull] params IBackgroundProcess[] additionalProcesses)  6  {  7             // 其餘代碼...
 8             var server = new BackgroundJobServer(options, storage, additionalProcesses);  9             
10             return builder; 11         }

 

UseHangfireServer擴展方法實現裏面,比較重要的一行代碼就是建立BackgroundJobServer,BackgroundJobServer實現了IBackgroundProcessingServer接口,server的啓動也就是間接在它的構造器裏面完成的。咱們不妨先瞅瞅IBackgroundProcessingServer接口和BackgroundJobServer類的定義。
 1 // IBackgroundProcessingServer
 2 public interface IBackgroundProcessingServer : IDisposable  3  {  4         void SendStop();  5         bool WaitForShutdown(TimeSpan timeout);  6  Task WaitForShutdownAsync(CancellationToken cancellationToken);  7  }  8  
 9 // BackgroundJobServer
10 public class BackgroundJobServer : IBackgroundProcessingServer 11  { 12         // 其餘成員...
13         public BackgroundJobServer( 14  [NotNull] BackgroundJobServerOptions options, 15  [NotNull] JobStorage storage, 16             [NotNull] IEnumerable<IBackgroundProcess> additionalProcesses, 17  [CanBeNull] IJobFilterProvider filterProvider, 18  [CanBeNull] JobActivator activator, 19  [CanBeNull] IBackgroundJobFactory factory, 20  [CanBeNull] IBackgroundJobPerformer performer, 21  [CanBeNull] IBackgroundJobStateChanger stateChanger) 22  { 23             // 其餘代碼
24             var processes = new List<IBackgroundProcessDispatcherBuilder>(); 25  processes.AddRange(GetRequiredProcesses(filterProvider, activator, factory, performer, stateChanger)); 26             processes.AddRange(additionalProcesses.Select(x =>  x.UseBackgroundPool(1))); 27             var properties = new Dictionary<string, object>
28  { 29                 { "Queues", options.Queues }, 30                 { "WorkerCount", options.WorkerCount } 31  }; 32             
33             _processingServer = new BackgroundProcessingServer( 34  storage, 35  processes, 36  properties, 37  GetProcessingServerOptions()); 38  } 39         public void SendStop() 40  { 41  } 42         public void Dispose() 43  { 44  } 45         [Obsolete("This method is a stub. There is no need to call the `Start` method. Will be removed in version 2.0.0.")] 46         public void Start() 47  { 48  } 49         [Obsolete("Please call the `Shutdown` method instead. Will be removed in version 2.0.0.")] 50         public void Stop() 51  { 52  } 53         [Obsolete("Please call the `Shutdown` method instead. Will be removed in version 2.0.0.")] 54         public void Stop(bool force) 55  { 56  } 57         public bool WaitForShutdown(TimeSpan timeout) 58  { 59  } 60         public Task WaitForShutdownAsync(CancellationToken cancellationToken) 61  { 62         }

 

IBackgroundProcessingServer接口裏面的這幾個方法都是跟停用server,取消任務清理資源相關的。BackgroundJobServer類裏面真正完成接口的實現是由BackgroundProcessingServer類型的同名函數實現,這個對象是在構造函數裏面初始化的,在初始化BackgroundProcessingServer類型的同時,建立了若干IBackgroundProcessDispatcherBuilder實現類BackgroundProcessDispatcherBuilder的實例,hangfire默認實現了7種dispatcher,咱們任務、日誌、心跳等等獨立線程都是由它的Create方法完成,這個地方不算server啓動主線,會在後面細說。咱們繼續看看BackgroundProcessingServer這個類型。這裏須要注意的是裏面有幾個方法好像是被停用了,start、stop等方法,官方也註釋了,被刪除了。start方法被停用了,難道咱們的server啓動是在BackgroundProcessingServer類型裏面?繼續看BackgroundProcessingServer的定義。
 1 public sealed class BackgroundProcessingServer : IBackgroundProcessingServer  2  {  3         // 其餘成員
 4         internal BackgroundProcessingServer(  5  [NotNull] BackgroundServerProcess process,  6  [NotNull] BackgroundProcessingServerOptions options)  7  {  8             _process = process ?? throw new ArgumentNullException(nameof(process));  9             _options = options ?? throw new ArgumentNullException(nameof(options)); 10             _dispatcher = CreateDispatcher(); 11 #if !NETSTANDARD1_3
12             AppDomain.CurrentDomain.DomainUnload += OnCurrentDomainUnload; 13             AppDomain.CurrentDomain.ProcessExit += OnCurrentDomainUnload; 14 #endif
15  } 16         public void SendStop() 17  { 18  } 19         public bool WaitForShutdown(TimeSpan timeout) 20  { 21  } 22         public async Task WaitForShutdownAsync(CancellationToken cancellationToken) 23  { 24  } 25         public void Dispose() 26  { 27             
28  } 29         private void OnCurrentDomainUnload(object sender, EventArgs args) 30  { 31             
32  } 33         private IBackgroundDispatcher CreateDispatcher() 34  { 35             var execution = new BackgroundExecution( 36  _stoppingCts.Token, 37                 new BackgroundExecutionOptions 38  { 39                     Name = nameof(BackgroundServerProcess), 40                     ErrorThreshold = TimeSpan.Zero, 41                     StillErrorThreshold = TimeSpan.Zero, 42                     RetryDelay = retry => _options.RestartDelay 43  }); 44             return new BackgroundDispatcher( 45  execution, 46  RunServer, 47  execution, 48  ThreadFactory); 49  } 50         private void RunServer(Guid executionId, object state) 51  { 52  _process.Execute(executionId, (BackgroundExecution)state, _stoppingCts.Token, _stoppedCts.Token, _shutdownCts.Token); 53  } 54         private static IEnumerable<Thread> ThreadFactory(ThreadStart threadStart) 55  { 56             yield return new Thread(threadStart) 57  { 58                 IsBackground = true, 59                 Name = $"{nameof(BackgroundServerProcess)} #{Interlocked.Increment(ref _lastThreadId)}", 60  }; 61  } 62     }

 

果不其然,server的啓動快要揭開神祕的面紗了,RunServer?翻譯過來應該是啓動服務吧,咱們暫且不去管他,先記一下這個有個runserver,咱們繼續跟蹤。在構造函數裏面調用了一個CreateDispatcher()的方法,咱們看下它的實現
 1 private IBackgroundDispatcher CreateDispatcher()  2  {  3             var execution = new BackgroundExecution(  4  _stoppingCts.Token,  5                 new BackgroundExecutionOptions  6  {  7                     Name = nameof(BackgroundServerProcess),  8                     ErrorThreshold = TimeSpan.Zero,  9                     StillErrorThreshold = TimeSpan.Zero, 10                     RetryDelay = retry => _options.RestartDelay 11  }); 12             return new BackgroundDispatcher( 13  execution, 14  RunServer, 15  execution, 16  ThreadFactory); 17         }

 

在CreateDispatcher方法裏面返回了一個BackgroundDispatcher,字面意思好像是後臺分發器,而且指定了回調runserver,BackgroundDispatcher實現了IBackgroundDispatcher接口,咱們先看下它們的定義。
 1 // IBackgroundDispatcher
 2 public interface IBackgroundDispatcher : IDisposable  3  {  4         bool Wait(TimeSpan timeout);  5  Task WaitAsync(TimeSpan timeout, CancellationToken cancellationToken);  6  }  7  
 8 // BackgroundDispatcher
 9 internal sealed class BackgroundDispatcher : IBackgroundDispatcher 10  { 11         // 其餘成員
12         public BackgroundDispatcher( 13  [NotNull] IBackgroundExecution execution, 14             [NotNull] Action<Guid, object> action, 15             [CanBeNull] object state, 16             [NotNull] Func<ThreadStart, IEnumerable<Thread>> threadFactory) 17  { 18             if (threadFactory == null) throw new ArgumentNullException(nameof(threadFactory)); 19             _execution = execution ?? throw new ArgumentNullException(nameof(execution)); 20             _action = action ?? throw new ArgumentNullException(nameof(action)); 21             _state = state; 22 #if !NETSTANDARD1_3
23  AppDomainUnloadMonitor.EnsureInitialized(); 24 #endif
25             var threads = threadFactory(DispatchLoop)?.ToArray(); 26             if (threads == null || threads.Length == 0) 27  { 28                 throw new ArgumentException("At least one unstarted thread should be created.", nameof(threadFactory)); 29  } 30             if (threads.Any(thread => thread == null || (thread.ThreadState &  ThreadState.Unstarted) == 0)) 31  { 32                 throw new ArgumentException("All the threads should be non-null and in the ThreadState.Unstarted state.", nameof(threadFactory)); 33  } 34             _stopped = new CountdownEvent(threads.Length); 35             foreach (var thread in threads) 36  { 37  thread.Start(); 38  } 39  } 40         public bool Wait(TimeSpan timeout) 41  { 42             return _stopped.WaitHandle.WaitOne(timeout); 43  } 44         public async Task WaitAsync(TimeSpan timeout, CancellationToken cancellationToken) 45  { 46             await _stopped.WaitHandle.WaitOneAsync(timeout,  cancellationToken).ConfigureAwait(false); 47  } 48         public void Dispose() 49  { 50  } 51         public override string ToString() 52  { 53  } 54         private void DispatchLoop() 55  { 56             try
57  { 58  _execution.Run(_action, _state); 59  } 60             catch (Exception ex) 61  { 62  
63  } 64             finally
65  { 66                 try
67  { 68  _stopped.Signal(); 69  } 70                 catch (ObjectDisposedException) 71  { 72  
73  } 74  } 75  } 76     }

 

從IBackgroundDispatcher接口的定義來看,分發器應該是負責協調資源處理,咱們具體看看BackgroundDispatcher的實現。以上代碼就是server的啓動執行核心代碼而且我以加粗,其實就是啓動線程Loop執行。在DispatchLoop方法裏面間接調用了我上面說的runserver方法。在runserver方法裏面實現了整個server端的初始化工做。咱們接着看DispatchLoop方法的實現 ,在這個方法裏面調用了IBackgroundExecution接口的run方法,繼續IBackgroundExecution接口的定義。
1 public interface IBackgroundExecution : IDisposable 2  { 3         void Run([NotNull] Action<Guid, object> callback, [CanBeNull] object state); 4         Task RunAsync([NotNull] Func<Guid, object, Task> callback, [CanBeNull]  object state); 5     }

 

就兩方法,run包含同步和異步,看看它的惟一實現類BackgroundExecution。
 1   internal sealed class BackgroundExecution : IBackgroundExecution  2  {  3                 // 其餘成員
 4         public void Run(Action<Guid, object> callback, object state)  5  {  6             if (callback == null) throw new ArgumentNullException(nameof(callback));  7             var executionId = Guid.NewGuid();  8            
 9  { 10 #if !NETSTANDARD1_3
11                 try
12 #endif
13  { 14                     HandleStarted(executionId, out var nextDelay); 15                     while (true) 16  { 17                         // Don't place anything here.
18                         try
19  { 20                            
21                             if (StopRequested) break; 22                             if (nextDelay > TimeSpan.Zero) 23  { 24  HandleDelay(executionId, nextDelay); 25  } 26  callback(executionId, state); 27                             HandleSuccess(out nextDelay); 28  } 29 #if !NETSTANDARD1_3
30                         catch (ThreadAbortException) when (AppDomainUnloadMonitor.IsUnloading) 31  { 32                             // Our thread is aborted due to AppDomain unload. It's better to give up to 33                             // not to cause the host to be more aggressive.
34                             throw; 35  } 36 #endif
37                         catch (OperationCanceledException) when (StopRequested) 38  { 39                             break; 40  } 41                         catch (Exception ex) 42  { 43                             HandleException(executionId, ex, out nextDelay); 44  } 45  } 46  HandleStop(executionId); 47  } 48 #if !NETSTANDARD1_3
49                 catch (ThreadAbortException ex) 50  { 51  HandleThreadAbort(executionId, ex); 52  } 53 #endif
54  } 55  } 56 }

 

hangfire裏面全部的獨立線程都是經過run方法執行,而後回調到本身的實現類Execute方法,自此每一個獨立的功能線程就循環幹着本身獨立的工做(這個後面會單獨分析RecurringJobScheduler)。繼續咱們的主線,server啓動,咱們以run的同步方法爲例,第一個線程(咱們就叫它主線程吧)啓動了一個while循環,在循環裏面而且callback調用了咱們的runserver方法。
    
1 private void RunServer(Guid executionId, object state) 2  { 3  _process.Execute(executionId, (BackgroundExecution)state, _stoppingCts.Token, _stoppedCts.Token, _shutdownCts.Token); 4         }

 

在runserver方法裏面的實現很簡單,直接調用了_process的execute方法,咱們簡單看下_process類型IBackgroundServerProcess的定義。
1 internal interface IBackgroundServerProcess 2  { 3         void Execute( 4  Guid executionId, 5  BackgroundExecution execution, 6  CancellationToken stoppingToken, 7  CancellationToken stoppedToken, 8  CancellationToken shutdownToken); 9     }

 

IBackgroundServerProcess的定義就一個execute方法,這個接口的工做其實就是初始化server服務端,咱們看看它的惟一實現類BackgroundServerProcess。
 1 internal sealed class BackgroundServerProcess : IBackgroundServerProcess  2  {  3         
 4         // 其餘成員
 5         public BackgroundServerProcess(  6  [NotNull] JobStorage storage,  7             [NotNull] IEnumerable<IBackgroundProcessDispatcherBuilder> dispatcherBuilders,  8  [NotNull] BackgroundProcessingServerOptions options,  9             [NotNull] IDictionary<string, object> properties)  10  {  11             if (dispatcherBuilders == null) throw new ArgumentNullException(nameof(dispatcherBuilders));  12  
 13  
 14             _storage = storage ?? throw new ArgumentNullException(nameof(storage));  15             _options = options ?? throw new ArgumentNullException(nameof(options));  16             _properties = properties ?? throw new ArgumentNullException(nameof(properties));  17  
 18  
 19             var builders = new List<IBackgroundProcessDispatcherBuilder>();  20             builders.AddRange(GetRequiredProcesses()); // 添加默認的工做dispatcher也就是獨立線程
 21  builders.AddRange(GetStorageComponents());  22  builders.AddRange(dispatcherBuilders);  23  
 24  
 25             _dispatcherBuilders = builders.ToArray();  26  }  27  
 28  
 29         public void Execute(Guid executionId, BackgroundExecution execution, CancellationToken stoppingToken,  30             CancellationToken stoppedToken, CancellationToken shutdownToken)  // server初始化
 31  {  32             var serverId = GetServerId();  33             Stopwatch stoppedAt = null;  34  
 35  
 36             void HandleRestartSignal()  37  {  38                 if (!stoppingToken.IsCancellationRequested)  39  {  40                     _logger.Info($"{GetServerTemplate(serverId)} caught restart signal...");  41  }  42  }  43             using (var restartCts = new CancellationTokenSource())  44             using (var restartStoppingCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, restartCts.Token))  45             using (var restartStoppedCts = CancellationTokenSource.CreateLinkedTokenSource(stoppedToken, restartCts.Token))  46             using (var restartShutdownCts = CancellationTokenSource.CreateLinkedTokenSource(shutdownToken, restartCts.Token))  47             using (restartStoppingCts.Token.Register(HandleStopRestartSignal))  48             using (stoppingToken.Register(HandleStoppingSignal))  49             using (stoppedToken.Register(HandleStoppedSignal))  50             using (shutdownToken.Register(HandleShutdownSignal))  51             using (restartCts.Token.Register(HandleRestartSignal))  52  {  53                 var context = new BackgroundServerContext(  54  serverId,  55  _storage,  56  _properties,  57  restartStoppingCts.Token,  58  restartStoppedCts.Token,  59  restartShutdownCts.Token);  60                 var dispatchers = new List<IBackgroundDispatcher>();  61  CreateServer(context);  62                 try
 63  {  64                     // ReSharper disable once AccessToDisposedClosure
 65                     using (var heartbeat = CreateHeartbeatProcess(context, () => restartCts.Cancel())) // 建立守護線程
 66  {  67                         StartDispatchers(context, dispatchers); // 啓動hangfire默認初始化的全部獨立任務線程
 68  execution.NotifySucceeded();  69  WaitForDispatchers(context, dispatchers);  70  
 71  
 72  restartCts.Cancel();  73  
 74  heartbeat.WaitAsync(Timeout.InfiniteTimeSpan, shutdownToken).GetAwaiter().GetResult();  75  }  76  }  77                 finally
 78  {  79  DisposeDispatchers(dispatchers);  80  ServerDelete(context, stoppedAt);  81  }  82  }  83  }  84  
 85  
 86         private IBackgroundDispatcher CreateHeartbeatProcess(BackgroundServerContext context, Action requestRestart) // 建立守護線程
 87  {  88             return new ServerHeartbeatProcess(_options.HeartbeatInterval, _options.ServerTimeout, requestRestart)  89                 .UseBackgroundPool(threadCount: 1)  90  .Create(context, _options);  91  }  92  
 93  
 94         private IEnumerable<IBackgroundProcessDispatcherBuilder> GetRequiredProcesses() // 初始化日誌和任務監控線程
 95  {  96             yield return new ServerWatchdog(_options.ServerCheckInterval, _options.ServerTimeout).UseBackgroundPool(threadCount: 1);  97             yield return new ServerJobCancellationWatcher(_options.CancellationCheckInterval).UseBackgroundPool(threadCount: 1);  98  }  99         private string GetServerId() // 獲取serverid
100  { 101             var serverName = _options.ServerName 102                  ?? Environment.GetEnvironmentVariable("COMPUTERNAME") 103                  ?? Environment.GetEnvironmentVariable("HOSTNAME"); 104             var guid = Guid.NewGuid().ToString(); 105  
106             return !String.IsNullOrWhiteSpace(serverName) ? $"{serverName.ToLowerInvariant()}:{guid}" : guid; 107  } 108  
109         
110         private void CreateServer(BackgroundServerContext context) // 建立server,寫入Server數據表
111  { 112             var stopwatch = Stopwatch.StartNew(); 113             using (var connection = _storage.GetConnection()) 114  { 115  connection.AnnounceServer(context.ServerId, GetServerContext(_properties)); 116  } 117  stopwatch.Stop(); 118  
119  
120  ServerJobCancellationToken.AddServer(context.ServerId); 121             _logger.Info($"{GetServerTemplate(context.ServerId)} successfully announced in {stopwatch.Elapsed.TotalMilliseconds} ms"); 122  } 123  
124  
125         private void StartDispatchers(BackgroundServerContext context, ICollection<IBackgroundDispatcher> dispatchers) // 啓動全部獨立的任務線程,包括咱們的隊列計劃、循環計劃、日誌、守護等等線程
126  { 127  
128             foreach (var dispatcherBuilder in _dispatcherBuilders) 129  { 130  dispatchers.Add(dispatcherBuilder.Create(context, _options)); 131  } 132  } 133  
134     }

 

以上代碼我有作精簡處理,不要糾結裏面的實現,代碼註釋也比較詳細。下面我作一個簡單的總結吧,第一個線程(暫時叫主線程吧)從startup裏面調用usehangfireserver擴展方法-》啓動一個新的worker線程用於初始化&啓動server-》主程返回-》啓動hangfire全部任務線程-》建立的第一個worker線程掛起(用於處理全部任務線程的資源釋放)。server的初始化工做大概就是這些,下面詳細看看hangfire的任務線程的執行原理,這裏咱們以RecurringJobScheduler循環任務爲例。
 
RecurringJobScheduler實現機制
還記得上面提到的7個dispatcher任務線程的建立嗎?這7個默認的任務線程初始化就發生在上面加粗的代碼裏面StartDispatchers方法,咱們看代碼。
1 private void StartDispatchers(BackgroundServerContext context,  ICollection<IBackgroundDispatcher> dispatchers) 2  { 3                // 其餘代碼...
4             foreach (var dispatcherBuilder in _dispatcherBuilders) 5  { 6                 dispatchers.Add(dispatcherBuilder.Create(context, _options)); // 初始化獨立任務線程
7  } 8         }

 

遍歷_dispatcherBuilders數組,7種任務類型,分別調用它們的Create方法。繼續看create方法。
   
 1  public IBackgroundDispatcher Create(BackgroundServerContext context,  BackgroundProcessingServerOptions options) // 第一步
 2  {  3             // 其餘代碼
 4             var execution = new BackgroundExecution(  5  context.StoppingToken,  6                 new BackgroundExecutionOptions  7  {  8                     Name = _process.GetType().Name,  9                     RetryDelay = options.RetryDelay 10                 }); // 定義本身的execution
11             return new BackgroundDispatcher( // 建立BackgroundDispatcher 
12  execution, 13                 ExecuteProcess, // 指定回調
14                 Tuple.Create(_process, context, execution), // 建立三元組上下文,注意一下1元組這個對象
15  _threadFactory); 16  } 17  
18 public BackgroundDispatcher(  // 第二步
19  [NotNull] IBackgroundExecution execution, 20             [NotNull] Action<Guid, object> action, 21             [CanBeNull] object state, 22             [NotNull] Func<ThreadStart, IEnumerable<Thread>> threadFactory) 23  { 24    
25             _state = state; 26  
27             var threads = threadFactory(DispatchLoop)?.ToArray(); 28            
29             foreach (var thread in threads) 30  { 31                 thread.Start(); // 執行線程
32  } 33  } 34  
35 private void DispatchLoop() // 第三步
36  { 37             try
38  { 39                 _execution.Run(_action, _state);  // 在run裏面回調_action
40  } 41             catch (Exception ex) 42  { 43  } 44             finally
45  { 46                 try
47  { 48  _stopped.Signal(); 49  } 50                 catch (ObjectDisposedException) 51  { 52  } 53  } 54  } 55  
56 private static void ExecuteProcess(Guid executionId, object state) // 第四步 回調方法,對應上面的指定回調
57  { 58             var tuple = (Tuple<IBackgroundProcess, BackgroundServerContext,  BackgroundExecution>)state; 59             var serverContext = tuple.Item2; 60             var context = new BackgroundProcessContext( // 建立公共上下文
61  serverContext.ServerId, 62  serverContext.Storage, 63                 serverContext.Properties.ToDictionary(x => x.Key, x => x.Value), 64  executionId, 65  serverContext.StoppingToken, 66  serverContext.StoppedToken, 67  serverContext.ShutdownToken); 68             while (!context.IsStopping) 69  { 70                 tuple.Item1.Execute(context); // 執行本身元組對應的實例
71  tuple.Item3.NotifySucceeded(); 72  } 73         }

 

上面有點亂啊,我大概簡單串起來講一下。第一步在create方法裏面建立了BackgroundDispatcher並指定了元組參數-》第二步綁定線程的執行函數Loop而且執行-》第三步執行Loop而且回調_action委託-》第四步_action參數對應的函數地址就是ExecuteProcess,最後在ExecuteProcess裏面經過元組參數調用對應的任務類型,自此7種任務類型啓動並開始工做。以上代碼還有個細節須要說明一下,Tuple.Create(_process, context, execution)。元組的第一個參數,其類型爲IBackgroundProcess,看下定義。
1 public interface IBackgroundProcess : IServerProcess 2  { 3         void Execute([NotNull] BackgroundProcessContext context); 4     }

 

接口就定義了一個方法,沒什麼特別的,可是它的幾個實現類就是咱們單獨的任務類,咱們下面要說的RecurringJobScheduler循環任務類也實現了這個接口。到此咱們的RecurringJobScheduler循環定時任務線程就算開始執行了。
RecurringJobScheduler循環定時任務機制
照舊看下這個類型的定義
 1 public class RecurringJobScheduler : IBackgroundProcess  2  {  3         // 其餘代碼
 4         public RecurringJobScheduler(  5  [NotNull] IBackgroundJobFactory factory,  6  TimeSpan pollingDelay,  7  [NotNull] ITimeZoneResolver timeZoneResolver,  8             [NotNull] Func<DateTime> nowFactory)  9  { 10             if (factory == null) throw new ArgumentNullException(nameof(factory)); 11             if (nowFactory == null) throw new ArgumentNullException(nameof(nowFactory)); 12             if (timeZoneResolver == null) throw new ArgumentNullException(nameof(timeZoneResolver)); 13  
14  
15             _factory = factory; 16             _nowFactory = nowFactory; 17             _timeZoneResolver = timeZoneResolver; 18             _pollingDelay = pollingDelay; 19             _profiler = new SlowLogProfiler(_logger); 20  } 21  
22  
23         /// <inheritdoc />
24         public void Execute(BackgroundProcessContext context) // 實現方法
25  { 26             if (context == null) throw new ArgumentNullException(nameof(context)); 27  
28  
29             var jobsEnqueued = 0; 30  
31  
32             while (EnqueueNextRecurringJobs(context)) // 從數據庫獲取定時任務
33  { 34                 jobsEnqueued++; 35  
36  
37                 if (context.IsStopping) 38  { 39                     break; 40  } 41  } 42  
43  
44             if (jobsEnqueued != 0) 45  { 46                 _logger.Debug($"{jobsEnqueued} recurring job(s) enqueued."); 47  } 48  
49  
50             if (_pollingDelay > TimeSpan.Zero) 51  { 52  context.Wait(_pollingDelay); 53  } 54             else
55  { 56                 var now = _nowFactory(); 57                 context.Wait(now.AddMilliseconds(-now.Millisecond).AddSeconds(-now.Second).AddMinutes(1) - now); 58  } 59  } 60     }

 

承上,調用元組的第一個參數的execute方法,RecurringJobScheduler的execute方法得以執行,該方法就幹一件事,每隔15秒從數據庫獲取待執行的計劃,每次1000條數據。經過EnqueueNextRecurringJobs方法獲取任務。
 1 private bool EnqueueNextRecurringJobs(BackgroundProcessContext context)  2  {  3             return UseConnectionDistributedLock(context.Storage, connection => 
 4  {  5                 var result = false;  6                 if (IsBatchingAvailable(connection))  7  {  8                     var now = _nowFactory();  9                     var timestamp = JobHelper.ToTimestamp(now); 10                     var recurringJobIds =  ((JobStorageConnection)connection).GetFirstByLowestScoreFromSet("recurring-jobs", 0,  timestamp, BatchSize); // 從數據庫裏面查詢
11                     if (recurringJobIds == null || recurringJobIds.Count == 0) return  false; 12                     foreach (var recurringJobId in recurringJobIds) 13  { 14                         if (context.IsStopping) return false; 15                         if (TryEnqueueBackgroundJob(context, connection, recurringJobId,  now))// 排隊執行
16  { 17                             result = true; 18  } 19  } 20  } 21                 else
22  { 23                     for (var i = 0; i < BatchSize; i++) 24  { 25                         if (context.IsStopping) return false; 26                         var now = _nowFactory(); 27                         var timestamp = JobHelper.ToTimestamp(now); 28                         var recurringJobId =  connection.GetFirstByLowestScoreFromSet("recurring-jobs", 0, timestamp); 29                         if (recurringJobId == null) return false; 30                         if (!TryEnqueueBackgroundJob(context, connection, recurringJobId, now)) 31  { 32                             return false; 33  } 34  } 35  } 36                 return result; 37  }); 38         }

 

GetFirstByLowestScoreFromSet方法從數據庫Set表裏面查詢top1000數據,條件是key爲recurring-jobs字符串(表示定時任務)而且 時間範圍是0到當前時間。隨後遍歷這些jobids,排隊執行,往下看TryEnqueueBackgroundJob方法的實現。
 1 private bool EnqueueBackgroundJob(  2  BackgroundProcessContext context,  3  IStorageConnection connection,  4             string recurringJobId,  5  DateTime now)  6  {  7             // 其餘代碼
 8             using (connection.AcquireDistributedRecurringJobLock(recurringJobId, LockTimeout))  9  { 10                 try
11  { 12                     var recurringJob = connection.GetRecurringJob(recurringJobId, _timeZoneResolver, now); 13                     if (recurringJob == null) 14  { 15                         using (var transaction = connection.CreateWriteTransaction()) 16  { 17                             transaction.RemoveFromSet("recurring-jobs", recurringJobId); 18  transaction.Commit(); 19  } 20                         return false; 21  } 22           
23                     BackgroundJob backgroundJob = null; 24                     IReadOnlyDictionary<string, string> changedFields; 25                     if (recurringJob.TrySchedule(out var nextExecution, out var error)) 26  { 27                         if (nextExecution.HasValue && nextExecution <= now) 28  { 29                             backgroundJob = _factory.TriggerRecurringJob(context.Storage, connection, _profiler, recurringJob, now); 30                             if (String.IsNullOrEmpty(backgroundJob?.Id)) 31  { 32                                 _logger.Debug($"Recurring job '{recurringJobId}' execution at '{nextExecution}' has been canceled."); 33  } 34  } 35                         recurringJob.IsChanged(out changedFields, out nextExecution); 36  } 37                     else if (recurringJob.RetryAttempt < MaxRetryAttemptCount) 38  { 39                         var delay = _pollingDelay > TimeSpan.Zero ? _pollingDelay :  TimeSpan.FromMinutes(1); 40                         
41                         _logger.WarnException($"Recurring job '{recurringJobId}' can't be scheduled due to an error and will be retried in {delay}.", error); 42                         recurringJob.ScheduleRetry(delay, out changedFields, out nextExecution); 43  } 44                     else
45  { 46                         _logger.ErrorException($"Recurring job '{recurringJobId}' can't be scheduled due to an error and will be disabled.", error); 47                         recurringJob.Disable(error, out changedFields, out nextExecution); 48  } 49               
50                     using (var transaction = connection.CreateWriteTransaction()) 51  { 52                         if (backgroundJob != null) 53  { 54  _factory.StateMachine.EnqueueBackgroundJob( 55  context.Storage, 56  connection, 57  transaction, 58  recurringJob, 59  backgroundJob, 60                                 "Triggered by recurring job scheduler", 61  _profiler); 62  } 63  transaction.UpdateRecurringJob(recurringJob, changedFields, nextExecution, _logger); 64  transaction.Commit(); 65                         return true; 66  } 67  } 68                 catch (TimeZoneNotFoundException ex) 69  { 70                 catch (Exception ex) 71  { 72    
73  } 74                 return false; 75  } 76         }

 

須要注意的地方我都有加粗,該方法大概流程是:1.GetRecurringJob根據jobid從Hash表裏面查詢一條完整的定時任務,2.TrySchedule獲取該任務的下次執行時間,若是下次執行時間小於當前,執行這條任務(並不是真正執行定時任務,只是往job表裏面寫數據,真正執行任務由worker完成),3.獲取下次執行時間&全部任務字段,4.狀態機修改任務狀態。定時任務就這樣周而復始的重複執行以上流程。這裏簡單說下worker的執行機制,其實際就是輪詢檢索job表裏面的數據執行任務表達式樹,worker在hangfire裏面默認開啓了20個線程。第三部分就到這吧。
 
避坑
簡單說下我的在改bug期間遇到的一些問題啊。
1.時區問題,在添加定時任務時若是不指定時區信息,默認使用的是utc時間,咱們中國是東8區,也就是說解析出來的執行時間會晚8個小時執行。解決辦法有幾種能夠經過全局指定options的ITimeZoneResolver屬性指定,也能夠經過AddorUpdate方法指定,若是是指定時區信息,須要注意看板上面的異常信息,若是有異常會致使任務不執行,時區信息它是從系統裏面檢索出來的,沒有就拋異常。就這樣吧。
相關文章
相關標籤/搜索