.NET Core下開源任務調度框架Hangfire的Api任務拓展(支持秒級任務)

HangFire的拓展和使用

看了不少博客,小白第一次寫博客。html

最近因爲以前的任務調度框架總出現問題,所以想尋找一個替代品,以前使用的是Quartz.Net,這個框架方便之處就是支持cron表達式適合複雜日期場景使用,以及秒級任務。可是配置比較複雜,並且管理不方便,本身開發了個web管理頁面,不過這個須要額外的單獨線程去統一管理工做狀態,很容易出現問題。mysql

有考慮過 「FluentScheduler」 ,使用簡單,可是管理配置也很麻煩,我但願能作到配置簡單,管理方便,高性能。最後想到了之前聽過的hangfire,它的好處就是自帶控制面板,在園子裏看了不少相關資料,偶然發現了有人拓展過hangfire經過調用api接口來執行任務,這種方式能夠避免依賴本地代碼,方便部署,在此基礎上,我用空閒時間拓展了一下如今已經基本能夠知足需求。linux

 

所拓展的功能所有屬於外部拓展,所以hangfire版本能夠一直更新,如今已經更新最新版,支持秒級任務git

gitHub地址github

 

因爲更新到最新版hangfire 1.7支持秒級任務,使用的在線表達式生成部分表達式有問題,注掉了秒級任務表達式生成,有時間須要詳細測試更改,能夠參考(hangfire官方提供的表達式)web

如今已經實現的功能有:

1,部署及調試:只須要配置數據庫鏈接,而後編譯便可運行,無需建表,支持(redis,mysql, sqlserver)其餘數據庫暫時用不到沒測試。推薦使用redis集羣。項目中直接添加了redis的存儲包,已經更新StackExchange.Redis到最新版本方便拓展,調試時能夠直接調試。部署,只須要發佈項目,運行建立windows服務的bat命令,命令已經包含在項目中,或者發佈至Linux。redis

 

2,週期任務:支持在控制面板頁面上添加週期任務,編輯週期任務,刪除週期任務,手動觸發週期任務,暫停和繼續週期任務(暫停實現的原理是經過set中添加屬性,在job執行前,過濾掉,直接跳過執行,由於hangfire中job一旦建立就失去了控制權,只能經過過濾器去攔截),任務暫停後會查詢狀態並渲染面板列表爲紅色字體方便查找哪一個任務被暫停。sql

3,計劃任務在做業選項卡中,計劃做業中能夠實現添加計劃任務,計劃任務可使任務在指定的分鐘後執行,只執行一次。數據庫

 

4,只讀面板經過配置的用戶名密碼,使用戶只具備讀取面板的權限,這樣能夠防止誤操做json

 

 

 1  //只讀面板,只能讀取不能操做
 2             app.UseHangfireDashboard("/job-read", new DashboardOptions  3  {  4                 AppPath = "#",//返回時跳轉的地址
 5                 DisplayStorageConnectionString = false,//是否顯示數據庫鏈接信息
 6                 IsReadOnlyFunc = Context =>
 7  {  8                     return true;  9  }, 10                 Authorization = new[] { new BasicAuthAuthorizationFilter(new BasicAuthAuthorizationFilterOptions 11  { 12                     RequireSsl = false,//是否啓用ssl驗證,即https
13                     SslRedirect = false, 14                     LoginCaseSensitive = true, 15                     Users = new [] 16  { 17                         new BasicAuthAuthorizationUser 18  { 19                             Login = "read", 20                             PasswordClear = "only"
21  }, 22                         new BasicAuthAuthorizationUser 23  { 24                             Login = "test", 25                             PasswordClear = "123456"
26  }, 27                         new BasicAuthAuthorizationUser 28  { 29                             Login = "guest", 30                             PasswordClear = "123@123"
31  } 32  } 33  }) 34  } 35             });
View Code

 

 

 5,郵件推送:目前使用的方式是,任務錯誤重試達到指定次數後,發送郵件通知,使用的MailKit

 1   catch (Exception ex)  2  {  3                 //獲取重試次數
 4                 var count = context.GetJobParameter<string>("RetryCount");  5  context.SetTextColor(ConsoleTextColor.Red);  6                 //signalR推送  7                 //SendRequest(ConfigSettings.Instance.URL+"/api/Publish/EveryOne", "測試");
 8                 if (count == "3")//重試達到三次的時候發郵件通知
 9  { 10  SendEmail(item.JobName, item.Url, ex.ToString()); 11  } 12                 logger.Error(ex, "HttpJob.Excute"); 13                 context.WriteLine($"執行出錯:{ex.Message}"); 14                 throw;//不拋異常不會執行重試操做
15             }
View Code
 1 /// <summary>
 2         /// 郵件模板  3         /// </summary>
 4         /// <param name="jobname"></param>
 5         /// <param name="url"></param>
 6         /// <param name="exception"></param>
 7         /// <returns></returns>
 8         private static string SethtmlBody(string jobname, string url, string exception)  9  { 10             var htmlbody = $@"<h3 align='center'>{HangfireHttpJobOptions.SMTPSubject}</h3> 11  <h3>執行時間:</h3> 12  <p> 13  {DateTime.Now} 14  </p> 15  <h3> 16  任務名稱:<span> {jobname} </span><br/> 17  </h3> 18  <h3> 19  請求路徑:{url} 20  </h3> 21  <h3><span></span> 22  執行結果:<br/> 23  </h3> 24  <p> 25  {exception} 26  </p> "; 27             return htmlbody; 28         }
郵件模板
 1  //使用redis
 2                         config.UseRedisStorage(Redis, new Hangfire.Redis.RedisStorageOptions()  3  {  4                             FetchTimeout=TimeSpan.FromMinutes(5),  5                             Prefix = "{hangfire}:",  6                             //活動服務器超時時間
 7                             InvisibilityTimeout = TimeSpan.FromHours(1),  8                             //任務過時檢查頻率
 9                             ExpiryCheckInterval = TimeSpan.FromHours(1), 10                             DeletedListSize = 10000, 11                             SucceededListSize = 10000
12  }) 13                         .UseHangfireHttpJob(new HangfireHttpJobOptions() 14  { 15                             SendToMailList = HangfireSettings.Instance.SendMailList, 16                             SendMailAddress = HangfireSettings.Instance.SendMailAddress, 17                             SMTPServerAddress = HangfireSettings.Instance.SMTPServerAddress, 18                             SMTPPort = HangfireSettings.Instance.SMTPPort, 19                             SMTPPwd = HangfireSettings.Instance.SMTPPwd, 20                             SMTPSubject = HangfireSettings.Instance.SMTPSubject 21                         })
配置郵件參數

 

6,signalR 推送:宿主程序使用的weapi,所以能夠經過webapi推送,這樣作的好處是能夠將服務看成推送服務使用,第三方接口也能夠利用此來推送,

 

 1  /// <summary>
 2        ///用戶加入組處理  3        /// </summary>
 4        /// <param name="userid">用戶惟一標識</param>
 5        /// <param name="GroupName">組名稱</param>
 6        /// <returns></returns>
 7         public Task InitUsers(string userid,string GroupName)  8  {  9             Console.WriteLine($"{userid}加入用戶組"); 10  Groups.AddToGroupAsync(Context.ConnectionId, GroupName); 11             SignalrGroups.UserGroups.Add(new SignalrGroups() 12  { 13                 ConnectionId = Context.ConnectionId, 14                 GroupName = GroupName, 15                 UserId = userid 16  }); 17             return Clients.All.SendAsync("UserJoin", "用戶組數據更新,新增id爲:" + Context.ConnectionId + " pid:" + userid); 18  } 19         /// <summary>
20         /// 斷線的時候處理 21         /// </summary>
22         /// <param name="exception"></param>
23         /// <returns></returns>
24         public override Task OnDisconnectedAsync(Exception exception) 25  { 26             //掉線移除用戶,不給其推送
27             var user = SignalrGroups.UserGroups.FirstOrDefault(c => c.ConnectionId == Context.ConnectionId); 28 
29             if (user != null) 30  { 31                 Console.WriteLine($"用戶:{user.UserId}已離線"); 32  SignalrGroups.UserGroups.Remove(user); 33  Groups.RemoveFromGroupAsync(Context.ConnectionId, user.GroupName); 34  } 35             return base.OnDisconnectedAsync(exception); 36         }
Hub定義
 1  /// <summary>
 2         /// 單個connectionid推送  3         /// </summary>
 4         /// <param name="groups"></param>
 5         /// <returns></returns>
 6         [HttpPost, Route("AnyOne")]  7         public IActionResult AnyOne([FromBody]IEnumerable<SignalrGroups> groups)  8  {  9             if (groups != null && groups.Any()) 10  { 11                 var ids = groups.Select(c => c.UserId); 12                 var list = SignalrGroups.UserGroups.Where(c => ids.Contains(c.UserId)); 13                 foreach (var item in list) 14                     hubContext.Clients.Client(item.ConnectionId).SendAsync("AnyOne", $"{item.ConnectionId}: {item.Content}"); 15  } 16             return Ok(); 17  } 18 
19         /// <summary>
20         /// 所有推送 21         /// </summary>
22         /// <param name="message"></param>
23         /// <returns></returns>
24         [HttpPost, Route("EveryOne")] 25         public IActionResult EveryOne([FromBody] MSG body) 26  { 27             var data = HttpContext.Response.Body; 28             hubContext.Clients.All.SendAsync("EveryOne", $"{body.message}"); 29             return Ok(); 30  } 31 
32         /// <summary>
33         /// 單個組推送 34         /// </summary>
35         /// <param name="group"></param>
36         /// <returns></returns>
37         [HttpPost, Route("AnyGroups")] 38         public IActionResult AnyGroups([FromBody]SignalrGroups group) 39  { 40             if (group != null) 41  { 42                 hubContext.Clients.Group(group.GroupName).SendAsync("AnyGroups", $"{group.Content}"); 43  } 44             return Ok(); 45         }
推送接口定義

 

7,接口健康檢查:由於主要用來調用api接口,所以集成接口健康檢查仍是頗有必要的,目前使用的方式是配置文件中添加須要檢查的地址

 1 /*健康檢查配置項*/
 2   "HealthChecks-UI": {  3     /*檢查地址,能夠配置當前程序和外部程序*/
 4     "HealthChecks": [  5  {  6         "Name": "Hangfire Api 健康檢查",  7         "Uri": "http://localhost:9006/healthz"
 8  }  9  ], 10     /*須要檢查的Api地址*/
11     "CheckUrls": [ 12  { 13         "Uri": "http://localhost:17600/CityService.svc/HealthyCheck", 14         "httpMethod": "Get"
15  }, 16  { 17         "Uri": "http://localhost:9098/CheckHelath", 18         "httpMethod": "Post"
19  }, 20  { 21         "Uri": "http://localhost:9067/GrtHelathCheck", 22         "httpMethod": "Get"
23  }, 24  { 25         "Uri": "http://localhost:9043/GrtHelathCheck", 26         "httpMethod": "Get"
27  } 28  ], 29     "Webhooks": [], //鉤子配置
30     "EvaluationTimeOnSeconds": 10, //檢測頻率
31     "MinimumSecondsBetweenFailureNotifications": 60, //推送間隔時間
32     "HealthCheckDatabaseConnectionString": "Data Source=\\healthchecksdb" //-> sqlite庫存儲檢查配置及日誌信息
33   }
健康檢查相關配置

後臺會根據配置的指定間隔去檢查服務接口是否能夠正常訪問,(這個中間件能夠實現不少檢查功能,包括網絡,數據庫,mq等,支持webhook推送等豐富功能,系統用不到所以沒有添加)

健康檢查的配置

1  //添加健康檢查地址
2             HangfireSettings.Instance.HostServers.ForEach(s =>
3  { 4                 services.AddHealthChecks().AddUrlGroup(new Uri(s.Uri), s.httpMethod.ToLower() == "post" ? HttpMethod.Post : HttpMethod.Get, $"{s.Uri}"); 5             });
健康檢查地址添加
 1  app.UseHealthChecks("/healthz", new HealthCheckOptions()  2  {  3                 Predicate = _ => true,  4                 ResponseWriter = UIResponseWriter.WriteHealthCheckUIResponse  5  });  6             app.UseHealthChecks("/health", options);//獲取自定義格式的json數據
 7             app.UseHealthChecksUI(setup =>
 8  {  9                 setup.UIPath = "/hc"; // 健康檢查的UI面板地址
10                 setup.ApiPath = "/hc-api"; // 用於api獲取json的檢查數據
11             });
健康檢查中間件配置

其中,ui配置路徑是在面板中展現檢查結果須要使用的地址

api地址,能夠經過接口的方式來調用檢查結果,方便在第三方系統中使用,其數據格式能夠自定義

 經過接口調用

 1 [{  2     "id": 1,  3     "status": "Unhealthy",  4     "onStateFrom": "2019-04-07T18:00:09.6996751+08:00",  5     "lastExecuted": "2019-04-07T18:05:03.4761739+08:00",  6     "uri": "http://localhost:53583/healthz",  7     "name": "Hangfire Api 健康檢查",  8     "discoveryService": null,  9     "entries": [{ 10         "id": 1, 11         "name": "http://localhost:17600/CityService.svc/HealthyCheck", 12         "status": "Unhealthy", 13         "description": "An error occurred while sending the request.", 14         "duration": "00:00:04.3907375"
15  }, { 16         "id": 2, 17         "name": "http://localhost:9098/CheckHelath", 18         "status": "Unhealthy", 19         "description": "An error occurred while sending the request.", 20         "duration": "00:00:04.4140310"
21  }, { 22         "id": 3, 23         "name": "http://localhost:9067/GrtHelathCheck", 24         "status": "Unhealthy", 25         "description": "An error occurred while sending the request.", 26         "duration": "00:00:04.3847367"
27  }, { 28         "id": 4, 29         "name": "http://localhost:9043/GrtHelathCheck", 30         "status": "Unhealthy", 31         "description": "An error occurred while sending the request.", 32         "duration": "00:00:04.4499007"
33  }], 34     "history": [] 35 }]
接口返回數據原始格式
 1 {  2     "status": "Unhealthy",  3     "errors": [{  4         "key": "http://localhost:17600/CityService.svc/HealthyCheck",  5         "value": "Unhealthy"
 6  }, {  7         "key": "http://localhost:9098/CheckHelath",  8         "value": "Unhealthy"
 9  }, { 10         "key": "http://localhost:9067/GrtHelathCheck", 11         "value": "Unhealthy"
12  }, { 13         "key": "http://localhost:9043/GrtHelathCheck", 14         "value": "Unhealthy"
15  }] 16 }
接口返回數據處理後格式
 1  //重寫json報告數據,可用於遠程調用獲取健康檢查結果
 2             var options = new HealthCheckOptions  3  {  4                 ResponseWriter = async (c, r) =>
 5  {  6                     c.Response.ContentType = "application/json";  7 
 8                     var result = JsonConvert.SerializeObject(new
 9  { 10                         status = r.Status.ToString(), 11                         errors = r.Entries.Select(e => new { key = e.Key, value = e.Value.Status.ToString() }) 12  }); 13                     await c.Response.WriteAsync(result); 14  } 15             };
處理方式

 

8,經過接口添加任務:添加編輯週期任務,添加計劃任務,觸發週期任務,刪除週期任務,多個任務連續一次執行的任務

 1  /// <summary>
 2         /// 添加一個隊列任務當即被執行  3         /// </summary>
 4         /// <param name="httpJob"></param>
 5         /// <returns></returns>
 6         [HttpPost, Route("AddBackGroundJob")]  7         public JsonResult AddBackGroundJob([FromBody] Hangfire.HttpJob.Server.HttpJobItem httpJob)  8  {  9             var addreslut = string.Empty;  10             try
 11  {  12                 addreslut = BackgroundJob.Enqueue(() => Hangfire.HttpJob.Server.HttpJob.Excute(httpJob, httpJob.JobName, null));  13  }  14             catch (Exception ec)  15  {  16                 return Json(new Message() { Code = false, ErrorMessage = ec.ToString() });  17  }  18             return Json(new Message() { Code = true, ErrorMessage = "" });  19  }  20 
 21         /// <summary>
 22         /// 添加一個週期任務  23         /// </summary>
 24         /// <param name="httpJob"></param>
 25         /// <returns></returns>
 26         [HttpPost, Route("AddOrUpdateRecurringJob")]  27         public JsonResult AddOrUpdateRecurringJob([FromBody] Hangfire.HttpJob.Server.HttpJobItem httpJob)  28  {  29             try
 30  {  31                 RecurringJob.AddOrUpdate(httpJob.JobName, () => Hangfire.HttpJob.Server.HttpJob.Excute(httpJob, httpJob.JobName, null), httpJob.Corn, TimeZoneInfo.Local);  32  }  33             catch (Exception ec)  34  {  35                 return Json(new Message() { Code = false, ErrorMessage = ec.ToString() });  36  }  37             return Json(new Message() { Code = true, ErrorMessage = "" });  38  }  39 
 40         /// <summary>
 41         /// 刪除一個週期任務  42         /// </summary>
 43         /// <param name="jobname"></param>
 44         /// <returns></returns>
 45         [HttpGet,Route("DeleteJob")]  46         public JsonResult DeleteJob(string jobname)  47  {  48             try
 49  {  50  RecurringJob.RemoveIfExists(jobname);  51  }  52             catch (Exception ec)  53  {  54                 return Json(new Message() { Code = false, ErrorMessage = ec.ToString() });  55  }  56             return Json(new Message() { Code = true, ErrorMessage = "" });  57  }  58         /// <summary>
 59         /// 手動觸發一個任務  60         /// </summary>
 61         /// <param name="jobname"></param>
 62         /// <returns></returns>
 63         [HttpGet, Route("TriggerRecurringJob")]  64         public JsonResult TriggerRecurringJob(string jobname)  65  {  66             try
 67  {  68  RecurringJob.Trigger(jobname);  69  }  70             catch (Exception ec)  71  {  72                 return Json(new Message() { Code = false, ErrorMessage = ec.ToString() });  73  }  74             return Json(new Message() { Code = true, ErrorMessage = "" });  75  }  76         /// <summary>
 77         /// 添加一個延遲任務  78         /// </summary>
 79         /// <param name="httpJob">httpJob.DelayFromMinutes(延遲多少分鐘執行)</param>
 80         /// <returns></returns>
 81         [HttpPost, Route("AddScheduleJob")]  82         public JsonResult AddScheduleJob([FromBody] Hangfire.HttpJob.Server.HttpJobItem httpJob)  83  {  84             var reslut = string.Empty;  85             try
 86  {  87                 reslut = BackgroundJob.Schedule(() => Hangfire.HttpJob.Server.HttpJob.Excute(httpJob, httpJob.JobName, null), TimeSpan.FromMinutes(httpJob.DelayFromMinutes));  88  }  89             catch (Exception ec)  90  {  91                 return Json(new Message() { Code = false, ErrorMessage = ec.ToString() });  92  }  93             return Json(new Message() { Code = true, ErrorMessage = "" });  94  }  95         /// <summary>
 96         /// 添加連續任務,多個任務依次執行,只執行一次  97         /// </summary>
 98         /// <param name="httpJob"></param>
 99         /// <returns></returns>
100         [HttpPost, Route("AddContinueJob")] 101         public JsonResult AddContinueJob([FromBody] List<Hangfire.HttpJob.Server.HttpJobItem> httpJobItems) 102  { 103             var reslut = string.Empty; 104             var jobid = string.Empty; 105             try
106  { 107                 httpJobItems.ForEach(k =>
108  { 109                     if (!string.IsNullOrEmpty(jobid)) 110  { 111                         jobid = BackgroundJob.ContinueJobWith(jobid, () => RunContinueJob(k)); 112  } 113                     else
114  { 115                         jobid = BackgroundJob.Enqueue(() => Hangfire.HttpJob.Server.HttpJob.Excute(k, k.JobName, null)); 116  } 117  }); 118                 reslut = "true"; 119  } 120             catch (Exception ec) 121  { 122                 return Json(new Message() { Code = false, ErrorMessage = ec.ToString() }); 123  } 124             return Json(new Message() { Code = true, ErrorMessage = "" }); 125         }
經過接口添加任務

這樣作的好處是有效利用了宿主的webapi,並且無需登陸控制面板操做就能實現任務管理,方便集成管理到其餘系統中

 

防止多個實例的任務並行執行,即一個任務未執行完成,另外一個相同的任務開始執行,可使用分佈式鎖來解決

 經過特性來添加任務重試時間間隔(hangfire 1.7 新增,單位/秒),重試次數,隊列名稱,任務名稱,以及分佈式鎖超時時間

 1 /// <summary>
 2         /// 執行任務,DelaysInSeconds(重試時間間隔/單位秒)  3         /// </summary>
 4         /// <param name="item"></param>
 5         /// <param name="jobName"></param>
 6         /// <param name="context"></param>
 7         [AutomaticRetry(Attempts = 3, DelaysInSeconds = new[] { 30, 60, 90 }, LogEvents = true, OnAttemptsExceeded = AttemptsExceededAction.Fail)]  8         [DisplayName("Api任務:{1}")]  9         [Queue("apis")] 10         [JobFilter(timeoutInSeconds: 3600)]
配置分佈式鎖超時時間

 

 1 //設置分佈式鎖,分佈式鎖會阻止兩個相同的任務併發執行,用任務名稱和方法名稱做爲鎖
 2             var jobresource = $"{filterContext.BackgroundJob.Job.Args[1]}.{filterContext.BackgroundJob.Job.Method.Name}";  3             var locktimeout = TimeSpan.FromSeconds(_timeoutInSeconds);  4             try
 5  {  6                 //判斷任務是否被暫停
 7                 using (var connection = JobStorage.Current.GetConnection())  8  {  9                     var conts = connection.GetAllItemsFromSet($"JobPauseOf:{filterContext.BackgroundJob.Job.Args[1]}"); 10                     if (conts.Contains("true")) 11  { 12                         filterContext.Canceled = true;//任務被暫停不執行直接跳過
13                         return; 14  } 15  } 16                 //申請分佈式鎖
17                 var distributedLock = filterContext.Connection.AcquireDistributedLock(jobresource, locktimeout); 18                 filterContext.Items["DistributedLock"] = distributedLock; 19  } 20             catch (Exception ec) 21  { 22                 //獲取鎖超時,取消任務,任務會默認置爲成功
23                 filterContext.Canceled = true; 24                 logger.Info($"任務{filterContext.BackgroundJob.Job.Args[1]}超時,任務id{filterContext.BackgroundJob.Id}"); 25             }
過濾器添加分佈式鎖

 

1  if (!filterContext.Items.ContainsKey("DistributedLock")) 2  { 3                 throw new InvalidOperationException("找不到分佈式鎖,沒有爲該任務申請分佈式鎖."); 4  } 5             //釋放分佈式鎖
6             var distributedLock = (IDisposable)filterContext.Items["DistributedLock"]; 7             distributedLock.Dispose();
釋放分佈式鎖

 

經過過濾器來設置任務過時時間,過時後自動在數據庫刪除歷史記錄

 

1 public void OnStateApplied(ApplyStateContext context, IWriteOnlyTransaction transaction) 2  { 3             //設置過時時間,任務將在三天後過時,過時的任務會自動被掃描並刪除
4             context.JobExpirationTimeout = TimeSpan.FromDays(3); 5         }
設置任務過時時間

 

redis集羣下,測試秒級任務

集羣爲windws環境下,一個主節點四個從節點,(使用時須要在redis鏈接中配置所有集羣鏈接,主節點和從節點),目前用不到linux環境,沒有進行測試。

相關文章
相關標籤/搜索