最近遇到了這樣的場景:每隔一段時間,須要在後臺使用隊列對一批數據進行業務處理。數組
Quartz.NET是一種選擇,在 .NET Core中,可使用IHostedService
執行後臺定時任務。在本篇中,首先嚐試把隊列還原到最簡單、原始的狀態,而後給出以上場景問題的具體解決方案。安全
假設一個隊列有8個元素。如今abcd依次進入隊列。網絡
0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
---|---|---|---|---|---|---|---|
a | b | c | d | ||||
head | tail |
ab依次出隊列。併發
0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
---|---|---|---|---|---|---|---|
c | d | ||||||
head | tail |
能夠想象,隨着不斷地入列出列,head和tail的位置不斷日後,當tail在7號位的時候,雖然隊列裏還有空間,但此時數據就沒法入隊列了。app
如何才能夠繼續入隊列呢?ide
首先想到的是數據搬移。當數據沒法進入隊列,首先讓隊列項出列,進入到另一個新隊列,這個新隊列就能夠再次接收數據入隊列了。可是,搬移整個隊列中的數據的時間複雜度爲O(n),而原先出隊列的時間複雜度是O(1),這種方式不夠理想。ui
還有一種思路是使用循環隊列。當tail指向最後一個位置,此時有新的數據進入隊列,tail就來到頭部指向0號位置,這樣這個隊列就能夠循環使用了。線程
0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
---|---|---|---|---|---|---|---|
h | i | j | |||||
head | tail |
如今a入棧。code
0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
---|---|---|---|---|---|---|---|
h | i | j | a | ||||
tail | head |
隊列有不少種實現。接口
好比在生產消費模型中能夠用阻塞隊列。當生產隊列爲空的時候,爲了避免讓消費者取數據,生產隊列的Dequeue行爲會被阻塞;而當生產隊列滿的時候,爲了避免讓更多的數據進來,生產隊列的Enqueue行爲被阻塞。
線程安全的隊列叫併發隊列,如C#中的ConcurrentQueue
。
線程池內部也使用了隊列機制。由於CPU的資源是有限的,過多的線程會致使CPU頻繁地在線程之間切換。線程池內經過維護必定數量的線程來減輕CPU的負擔。當線程池沒有多餘的線程可供使用,請求過來,一種方式是拒絕請求,另一種方式是讓請求隊列阻塞,等到線程池內有線程可供使用,請求隊列出列執行。用鏈表實現的隊列是無界隊列(unbounded queue),這種作法可能會致使過多的請求在排隊,等待響應時間過長。用數組實現的隊列是有界隊列(bounded queue),當線程池已滿,請求過來就會被拒絕。對有界隊列來講數組的大小設置很講究。
來模擬一個數組隊列。
public class ArrayQueue { private string[] items; private int n = 0; //數組長度 private int head = 0; private int tail = 0; public ArrayQueue(int capacity) { n = capacity; items = new string[capacity]; } public bool Enqueue(string item) { if(tail==n){ return false; } items[tail] = item; ++tail; return true; } public string Dequeue() { if(head==null){ return null; } string ret = items[head]; ++head; return ret; } }
以上就是一個最簡單的、用數組實現的隊列。
再次回到要解決的場景問題。解決思路大體是:實現IHostedService
接口,在其中執行定時任務,每次把隊列項放到隊列中,並定義出隊列的方法,在其中執行業務邏輯。
關於隊列,經過如下的步驟使其在後臺運行。
Dictionary<string, MessageQueueItem>
靜態字典集合MessageQueueUtility
類:決定着如何運行,好比隊列執行的間隔時間、垃圾回收MessageQueueThreadUtility
類:維護隊列線程,提供隊列在後臺運行的方法Startup.cs
中的Configure
中調用MessageQueueThreadUtility
中的方法使隊列在後臺運行隊列項(MessageQueueItem)
public class MessageQueueItem { public MessageQueueItem(string key, Action action, string description=null) { Key = key; Action = action; Description = description; AddTime = DateTime.Now; } public string Key{get;set;} public Actioin Action{get;set;} public DateTime AddTime{get;set;} public string Description{get;set;} }
隊列(MessageQueue),維護着針對隊列項的一個靜態字典集合。
public class MessageQueue { public static Dictionary<string, MessageQueueItem> MessageQueueDictionary = new Dictionary<string, MessageQueueItem>(StringComparer.OrdinalIgnoreCase); public static object MessageQueueSyncLock = new object(); public static object OperateLock = new object(); public static void OperateQueue() { lock(OperateLock) { var mq = new MessageQueue(); var key = mq.GetCurrentKey(); while(!string.IsNullOrEmpty(key)) { var mqItem = mq.GetItem(key); mqItem.Action(); mq.Remove(key); key = mq.GetCurrentKey(); } } } public string GetCurrentKey() { lock(MessageQueueSyncLock) { return MessageQueueDictionary.Keys.FirstOrDefault(); } } public MessageQueueItem GetItem(string key) { lock(MessageQueueSyncLock) { if(MessageQueueDictionary.ContainsKey(key)) { return MessageQueueDictionary[key]; } return null; } } public void Remove(string key) { lock(MessageQueueSyncLock) { if(MessageQueueDictionary.ContainsKey(key)) { MessageQueueDictionary.Remove(key); } } } public MessageQueueItem Add(string key, Action actioin) { lock(MessageQueueSyncLock) { var mqItem = new MessageQueueItem(key, action); MessageQueueDictionary[key] = mqItem; return mqItem; } } public int GetCount() { lock(MessageQueueSyncLock) { return MessageQueueDictionary.Count; } } }
MessageQueueUtility
類, 決定着隊列運行的節奏。
public class MessageQueueUtility { private readonly int _sleepMilliSeconds; public MessageQueueUtility(int sleepMilliSeconds=1000) { _sleepMilliSeconds = sleepMilliSeoncds; } ~MessageQueueUtility() { MessageQueue.OperateQueue(); } public void Run { do { MessageQueue.OperateQueue(); Thread.Sleep(_sleepMilliSeconds); } while(true) } }
MessageQueueThreadUtility
類,管理隊列的線程,並讓其在後臺運行。
public static class MessageQueueThreadUtility { public static Dictionary<string, Thread> AsyncThreadCollection = new Dictioanry<string, Thread>(); public static void Register(string threadUniqueName) { { MessageQueueUtility messageQueueUtility = new MessageQueueUtility(); Thread messageQueueThread = new Thread(messageQueueUtility.Run){ Name = threadUniqueName }; AsyncThreadCollection.Add(messageQueueThread.Name, messageQueueThread); } AsyncThreadCollection.Values.ToList().ForEach(z => { z.IsBackground = true; z.Start(); }); } }
Startup.cs
中註冊。
public class Startup { public IServiceProvider ConfigureServices(IServiceCollection services) { ... } public void Configure(IApplicationBuilder app, IHostingEnvironment env...) { RegisterMessageQueueThreads(); } private void RegisterMessageQueueThreads() { MessageQueueThreadUtility.Register(""); } }
最後在IHostedService
的實現類中把隊列項丟給隊列。
public class MyBackgroundSerivce : IHostedService, IDisposable { private Timer _timer; public IServiceProvider Services{get;} public MyBackgroundService(IServiceProvider services) { Serivces = services; } public void Dispose() { _timer?.Dispose(); } public Task StartAsync(CancellationToken cancellationToken) { _timer = new Timer(DoWork, null, TimeSpan.Zero, TimeSpan.FromSeconds(10)); return Task.CompletedTask; } public Task StopAsync(CancellationToken cancellationToken) { _timer?.Change(Timeout.Infinite,0); return Task.CompletedTask; } private void DoWork(object state) { using(var scope = Services.CreateScope()) { using(var db = scope.ServiceProvider.GetRequiredService<MyDbContext>()) { ... var mq = new MessageQueue(); mq.Add("somekey", DealQueueItem); } } } private void DealQueueItem() { var mq = new MessageQueue(); var key = mq.GetCurrentKey(); var item = mq.GetItem(key); if(item!=null) { using(var scope = Services.CreateScope()) { using(var db = scope.ServiceProvider.GetRequiredService<MyDbContext>()) { //執行業務邏輯 } } } } }
當須要使用上下文的時候,首先經過IServiceProvider
的CreateScope
方法獲得ISerivceScope
,再經過它的ServiceProvider
屬性獲取依賴倒置容器中的上下文服務。
以上,用IHostedService
結合隊列解決了開篇提到的場景問題,若是您有很好的想法,咱們一塊兒交流吧。文中的隊列部分來自"盛派網絡"的Senparc.Weixin SDK源碼。