.net core實踐系列之短信服務-Sikiro.SMS.Job服務的實現

原文: .net core實踐系列之短信服務-Sikiro.SMS.Job服務的實現

前言

本篇會繼續講解Sikiro.SMS.Job服務的實現,在我寫第一篇的時候,我就發現我當時設計的架構裏Sikiro.SMS.Job這個能夠選擇不須要,而使用MQ代替。可是爲了說明調度任務使用實現也堅持寫了下。後面會一篇針對架構、實現優化的講解。html

源碼地址:https://github.com/SkyChenSky/Sikiro.SMSgit

Quartz的簡介

Quartz.NET是一款功能齊全的開源做業調度框架,小至的應用程序,大到企業系統均可以適用。Quartz是做者James House用JAVA語言編寫的,而Quartz.NET是從Quartz移植過來的C#版本。github

Quartz.Net的做用

  • Quartz.Net是多線程的,容許多個JOB同時執行。
  • Quartz.Net能夠進行持久化,結合管理後臺能夠進行可視化的監控
  • Quartz.Net提供API進行遠程操控,結合管理後臺能夠進行運維管理

在通常企業,能夠利用Quartz.Net框架作各類的定時任務,例如,數據遷移、跑報表等等。sql

Cron表達式

字段名 是否必填 值範圍 特殊字符
Seconds YES 0-59 , - * /
Minutes YES 0-59 , - * /
Hours YES 0-23 , - * /
Day of month YES 1-31 , - * ? / L W
Month YES 1-12 or JAN-DEC , - * /
Day of week YES 1-7 or SUN-SAT , - * ? / L #
Year NO empty, 1970-2099 , - * /

缺點

Quartz.Net的缺點很明顯,沒有自帶的管理後臺,而同款的Hangfir調度任務框架則會有更加良好的易用性。可是在Github上有很多人開源了Quartz.Net的管理後臺,對此做爲了彌補。多線程

其餘

其餘Quartz.Net的信息能夠看我以前記錄的一篇文章《Quartz.NET的使用(附源碼)架構

Quartz.Net DEMO:https://github.com/SkyChenSky/QuartzDotNetDemo.git併發

業務流程

從MongoDB持久化的數據,查詢出狀態爲待處理而且定時時間小於當前時間的數據。經過Mongo驅動提供的FindOneAndUpdate對文檔進行原子性操做(更新中間狀態並查詢出剛更新的文檔)。若是有數據則發送到MQ,由Sikiro.SMS.Bus進行訂閱發送,由於本次有數據,我認爲可能還會有其餘須要發送的數據,所以馬上調用JOB自身方法,進行下一條須要處理的數據進行發送。若是這次JOB的執行並無數據,那麼認爲接下來一段時間沒有須要處理的數據,此次調度結束。框架

TimeSendSms示例

public class TimeSendSms : BaseJob
    {
        private readonly SmsService _smsService;
        private readonly IBus _bus;

        public TimeSendSms(SmsService smsService, IBus bus)
        {
            _smsService = smsService;
            _bus = bus;
        }

        protected override void ExecuteBusiness()
        {
            _smsService.GetToBeSend();

            if (_smsService.Sms != null)
                _bus.Publish(_smsService.Sms.MapTo<SmsModel, SmsQueueModel>());

            _smsService.ContinueDo(ExecuteBusiness);
        }

        protected override void OnException()
        {
            _smsService.RollBack();
        }
    }

模板模式

Job的輪詢處理流程基本類似,查詢出須要執行數據-遍歷業務處理-若是有異常則特殊處理,所以針對相似流程相同,可是實現有差別的程序,咱們可使用模板模式。運維

 public abstract class BaseJob : IJob
    {
        private void OnException(Action action)
        {
            try
            {
                action();
            }
            catch (Exception e)
            {
                e.WriteToFile();
                OnException();
            }
        }

        public Task Execute(IJobExecutionContext context)
        {
            OnException(ExecuteBusiness);

            return null;
        }

        protected virtual void OnException()
        {

        }

        protected abstract void ExecuteBusiness();
    }

 

Mongo的原子性

原子性

原子是物理概念,指的是指化學反應不可再分的基本微粒。而計算機領域的原子性強調的對象是操做(指令、事務)。咱們所說的指令組是原子操做,意思要麼一塊兒成功,要麼一塊兒失敗。不容許2個指令裏,一個成功一個失敗的狀況存在。ide

MongoDB 原子操做

MongoDB的原子操做就是要麼這個文檔完整的保存到Mongodb,要麼沒有保存到Mongodb,不會出現查詢到的文檔沒有保存完整的狀況。

MongoDB的文檔的保存,修改,刪除等操做都是原子性,除此以外還提供了FindOneAndDelete、FindOneAndUpdate、FindOneAndReplace等原子操做。

以FindOneAndUpdate爲例,對某文檔FindOneAndUpdate,能夠文檔B進行Update操做完成後返回出文檔B的結果,根據參數返回結果是更新前仍是更新後(通常咱們須要更新後)。

而這FindOneAndUpdate的操做對於咱們更新到中間狀態的很是實用:

  • 避免進行Update後沒法良好的查詢到剛Update的文檔
  • 避免應用集羣部署時批量更新後,沒法良好分配任務
  • 批量更新多個文檔須要isolated標識隔離,全局鎖在大併發狀況下性能並不樂觀

雖然以上能夠經過更新時標識版本號進行解決,這無疑增長實現難度。

MongoDB鎖機制

Mongodb併發操做又讀寫鎖來進行控制。

簡單來講

當進行讀操做的時候會加讀鎖,這個時候其餘讀操做能夠也得到讀鎖,可是不能加寫鎖,也就是說不能進行寫操做。

當進行寫操做的時候會加寫鎖,這個時候其餘操做沒法加任何鎖,也就是說不能進行其餘的讀操做和寫操做。

多個JOB的併發性

綜上所述,落實到咱們應用場景,在部署多個調度任務服務,或者JOB多個線程去跑時,咱們可使用FindOneAndUpdate,每一個調度任務每次只處理一個文檔,Update操做的時候會進行寫鎖阻塞其餘進程(進程)的寫操做。那麼就能夠保證每一個調度任務均可以只處理惟一一個有效的文檔,避免重複處理。

下面是個人Sikiro.Nosql.Mongo的FindOneAndUpdate封裝示例,由於Update字段的不友好,因此我封裝了一下Lambda表達式,ReturnDocument = ReturnDocument.After標識響應數據是更新前仍是更新後的文檔。

public T GetAndUpdate<T>(string database, string collection, Expression<Func<T, bool>> predicate, Expression<Func<T, T>> updateExpression)
        {
            var db = _mongoClient.GetDatabase(database);
            var col = db.GetCollection<T>(collection);

            var updateDefinitionList = MongoExpression<T>.GetUpdateDefinition(updateExpression);

            var updateDefinitionBuilder = new UpdateDefinitionBuilder<T>().Combine(updateDefinitionList);

            return col.FindOneAndUpdate(predicate, updateDefinitionBuilder, new FindOneAndUpdateOptions<T, T>
            {
                ReturnDocument = ReturnDocument.After
            });

SQL Server的UpdateSelect

SQL Server的操做也具備上述FindOneAndUpdate的功能,咱們公司成他爲UpdateSelect,下面是示例代碼:

UPDATE TOP ( 100 )
        SYS_USER WITH ( UPDLOCK, READPAST )
SET     USER_STATUS = 1
OUTPUT  INSERTED.[USER_NAME] ,
        INSERTED.SYS_USERID ,
        INSERTED.EMAIL
FROM    SYS_USER
WHERE   CREATE_DATETIME < '2018-09-13'
        AND USER_STATUS = 2;

結尾

本篇介紹了調度任務結合MongoDB原子操做的使用,使得調度任務服務能夠具備良好的伸縮性。若是有任何建議與問題能夠在下方評論反饋給我。

相關文章
相關標籤/搜索