採用簡易的環形延時隊列處理秒級定時任務的解決方案

 業務背景

在稍微複雜點業務系統中,不可避免會碰到作定時任務的需求,好比淘寶的交易超時自動關閉訂單、超時自動確認收貨等等。對於一些定時做業比較多的系統,一般都會搭建專門的調度平臺來管理,經過建立定時器來週期性執行任務。如剛纔所說的場景,咱們能夠給訂單建立一個專門的任務來處理交易狀態,每秒輪詢一次訂單表,找出那些符合超時條件的訂單而後標記狀態。這是最簡單粗暴的作法,但明顯也很low,本身都下不去手寫這樣的代碼,全部必需要找個更好的方案。web

回到真實項目中的場景,系統中某個活動上線後要給目標用戶發送短信通知,這些通知須要按時間點批量發送。雖然已經基於quartz.net給系統搭建了任務調度平臺,但着實不想用上述方案來實現。在網上各類搜索和思考,找到一篇文章讓我眼前一亮,稍加分析發現裏面的思路徹底符合如今的場景,因而決定在本身項目中實現出來。數據庫

 

原理分析

 這種方案的核心就是構造一種數據結構,稱之爲環形隊列,但實際上仍是一個數組,加上對它的循環遍歷,達到一種環狀的假象。而後再配合定時器,就能夠實現按需延時的效果。上面提到的文章中也介紹了實現思路,這裏我採用個人理解再更加詳細的解釋一下。數組

咱們先爲這個數組分配一個固定大小的空間,好比60,每一個數組的元素用來存聽任務的集合。而後開啓一個定時器每隔一秒來掃描這個數組,掃完一圈恰好是一分鐘。若是提早設置好任務被掃描的圈數(CycleNum)和在數組中的位置(Slot),在恰好掃到數組的Slot位置時,集合裏那些CycleNum爲0的任務就是達到觸發條件的任務,拉出來作業務操做而後移除掉,其餘的把圈數減掉一次,而後留到下次繼續掃描,這樣就實現了延時的效果。原理以下圖所示:數據結構

能夠看出中間的重點是計算出每一個任務所在的位置以及須要循環的圈數。假設當前時間爲15:20:08,當前掃描位置是2,個人任務要在15:22:35這個時刻觸發,也就是147秒後。那麼我須要循環的圈數就是147/60=2圈,須要被掃描的位置就是(147+2)%60=29的地方。計算好任務的座標後塞到數組中屬於它的位置,而後靜靜等待被消費就好啦。測試

 

擼碼實現

光講原理不上代碼怎麼能行呢,根據上面的思路,下面一步步在.net平臺下實現出來。ui

先作一些基礎封裝。spa

首先構造任務參數的基類,用來記錄任務的位置信息和定義業務回調方法:.net

    public class DelayQueueParam
    {
        internal int Slot { get; set; }

        internal int CycleNum { get; set; }

        public Action<object> Callback { get; set; }
    }

接下來是核心地方。再構造隊列的泛型類,真實類型必須派生自上面的基類,用來擴展一些業務字段方便消費時使用。隊列的主要屬性有當前位置指針以及數組容器,主要的操做有插入、移除和消費。插入任務時須要傳入執行時間,用來計算這個任務的座標。線程

    public class DelayQueue<T> where T : DelayQueueParam
    {
        private List<T>[] queue;

        private int currentIndex = 1;

        public DelayQueue(int length)
        {
            queue = new List<T>[length];
        }

        public void Insert(T item, DateTime time)
        {
            //根據消費時間計算消息應該放入的位置
            var second = (int)(time - DateTime.Now).TotalSeconds;
            item.CycleNum = second / queue.Length;
            item.Slot = (second + currentIndex) % queue.Length;
            //加入到延時隊列中
            if (queue[item.Slot] == null)
            {
                queue[item.Slot] = new List<T>();
            }
            queue[item.Slot].Add(item);
        }

        public void Remove(T item)
        {
            if (queue[item.Slot] != null)
            {
                queue[item.Slot].Remove(item);
            }
        }

        public void Read()
        {
            if (queue.Length >= currentIndex)
            {
                var list = queue[currentIndex - 1];
                if (list != null)
                {
                    List<T> target = new List<T>();
                    foreach (var item in list)
                    {
                        if (item.CycleNum == 0)
                        {
                            //在本輪命中,用單獨線程去執行業務操做
                            Task.Run(()=> { item.Callback(item); });
                            target.Add(item);
                        }
                        else
                        {
                            //等下一輪
                            item.CycleNum--;
                            System.Diagnostics.Debug.WriteLine($"@@@@@索引:{item.Slot},剩餘:{item.CycleNum}");
                        }
                    }
                    //把已過時的移除掉
                    foreach (var item in target)
                    {
                        list.Remove(item);
                    }
                }
                currentIndex++;
                //下一遍從頭開始
                if (currentIndex > queue.Length)
                {
                    currentIndex = 1;
                }
            }
        }
    }

接下來是使用方法。設計

建立一個管理隊列實例的靜態類,裏面封裝對隊列的操做:

    public static class NotifyPlanManager
    {
        private static DelayQueue<NotifyPlan> _queue = new DelayQueue<NotifyPlan>(60);

        public static void Insert(NotifyPlan plan, DateTime time)
        {
            _queue.Insert(plan, time);
        }

        public static void Read()
        {
            _queue.Read();
        }
    }

構建咱們的實際業務參數類,派生自DelayQueueParam:

    public class NotifyPlan : DelayQueueParam
    {
        public Guid CamId { get; set; }

        public int PreviousTotal { get; set; }

        public int Amount { get; set; }
    }

生產端往隊列中插入數據:

    Action<object> callback = (result) =>
    {
        var np = result as NotifyPlan;
        //這裏作本身的業務操做
        //舉個例子:
        Debug.WriteLine($"活動ID:{np.CamId},已發送數量:{np.PreviousTotal},本次發送數量:{np.Amount}");
    };
    NotifyPlanManager.Insert(new NotifyPlan
    {
        Amount = set.MainAmount,
        CamId = camId,
        PreviousTotal = 0,
        Callback = callback
    }, smsTemplate.SendDate);

再建立一個每秒執行一次的定時器用作消費端,我這裏使用的是FluentScheduler,核心代碼:

    internal class NotifyPlanJob : IJob
    {
        /// <summary>
        /// 執行計劃
        /// </summary>
        public void Execute()
        {
            NotifyPlanManager.Read();
        }
    }

    internal class JobFactory : Registry
    {
        public JobFactory()
        {
            //每秒運行一次
            Schedule<NotifyPlanJob >().ToRunEvery(1).Seconds();
        }
    }

  JobManager.Initialize(new JobFactory());

而後開啓調試運行,打開本機的系統時間面板,對着時間看輸出結果。親測有效。

 

總結

 這種方案的好處是避免了頻繁地掃描數據庫和沒必要要的業務操做,另外也很方便控制時間精度。帶來的問題是若是web服務異常或重啓可能會發生任務丟失的狀況,我目前的處理方法是在數據庫中標記任務狀態,服務啓動時把狀態爲「排隊中」的任務從新加載到隊列中等待消費。

以上方案在單機環境測試沒問題,多節點狀況下暫時沒有深究。如有設計實現上的缺陷,歡迎討論與指正,要是有更好的方案,那就當拋磚引玉,再好不過了~

相關文章
相關標籤/搜索