性能提高五十倍:消息隊列延時聚合通知的重要性

前言後端

這個話題對我而言,是影響好久的事情。從第一次使用消息隊列開始,業務背景是報名系統通知到咱們的系統。正常流量下數據都能正常通知過來,但遇到導入報名人時,採用了Task異步通知,數據量一大,隊列就死了。當時是儘可能採用同步方式,減小併發量。安全

 

後來業務上有了專門的營銷系統,各類數據的增刪改都要進營銷系統,我採用的方式在倉儲層對須要通知的表的任何更新都通知到隊列,這樣的方式幾乎對其餘業務無侵犯。性能優化

 

好處有,壞處也有。不少批量任務的更新若是採用同步方式頻繁通知是十分浪費速度的,既影響數據的更新速度,也對隊列帶來了挑戰。我曾經專門拉了個分支來優化批量任務,但因爲須要涉及不少批量任務最後不了了之。架構

更合理的推送模型應該是這樣,更新消息先到內存隊列,積累一段時間(5秒或30秒)後,聚合到一塊兒推送到消息隊列,以下圖:併發

 

挑戰過去異步

其實也說不上是問題,緣由知道,解決方法也知道。只是現狀還能支撐,就沒有去解決,但這些事情總要面對的。挑戰過去的糟糕代碼,優化提高性能,自己就是一個技術成長的過程。分佈式

邁出第一步ide

第一步固然是Demo,先列出代碼。先貼上一個基於Rabbitmq.Client的客戶端幫助代碼,用於推送單條數據和多條數據。高併發

 
public class RabbitProvider { public const string RABBITMQURL = "amqp://test:test@rabbitmq.login1.com:5672/test"; private static IConnection conn; /// <summary> /// 獲取鏈接。 /// </summary> /// <param name="url"></param> /// <returns></returns> public static IConnection CreateConnection(string url) { ConnectionFactory factory = new ConnectionFactory(); factory.Uri = new Uri(url); factory.AutomaticRecoveryEnabled = true; IConnection conn = factory.CreateConnection(); return conn; } /// <summary> /// 單個 /// </summary> /// <param name="data"></param> public static void Publish<T>(string exchange, string queue, string route, T data) { if (conn == null || !conn.IsOpen) { conn = CreateConnection(RABBITMQURL); } using (IModel model = conn.CreateModel()) { model.ExchangeDeclare(exchange, ExchangeType.Direct); model.QueueDeclare(queue, false, false, false, null); model.QueueBind(queue, exchange, route, null); //IBasicProperties props = ch.CreateBasicProperties(); //FillInHeaders(props); // or similar // byte[] body = ComputeBody(props); // or similar  model.BasicPublish(exchange, route, null, System.Text.Encoding.Default.GetBytes(data.ToString())); } } /// <summary> /// 多條數據 /// </summary> /// <param name="data"></param> public static void Publish<T>(string exchange, string queue, string route, List<T> data) { if (conn == null || !conn.IsOpen) { conn = CreateConnection(RABBITMQURL); } using (IModel model = conn.CreateModel()) { model.ExchangeDeclare(exchange, ExchangeType.Direct); model.QueueDeclare(queue, false, false, false, null); model.QueueBind(queue, exchange, route, null); //IBasicProperties props = ch.CreateBasicProperties(); //FillInHeaders(props); // or similar // byte[] body = ComputeBody(props); // or similar foreach (var item in data) { model.BasicPublish(exchange, route, null, System.Text.Encoding.Default.GetBytes(item.ToString())); } } } }

也許在部分人眼裏能提供支持單條和多條推送的方式已經能解決絕大多數問題,看起來確實如此。但單純的推送批量數據是有業務方發起,是對每一個批量任務都有較大侵入的,雖然它很好,但不夠好。接下來咱們貼上基於BlockingCollection<T>提供的線程安全集合來完成的隊列代碼。性能

 
public class DANQueue<T> : IDANQueue<T> { private static BlockingCollection<DANMessage<T>> GlobalCollection; static DANQueue() { GlobalCollection = new BlockingCollection<DANMessage<T>>(); } /// <summary> /// 添加 /// </summary> /// <param name="item"></param> /// <returns></returns> public static bool TryAdd(DANMessage<T> item) { return GlobalCollection.TryAdd(item); } /// <summary> /// 獲取一個 /// </summary> /// <param name="item"></param> public static DANMessage<T> TryTake() { var msg = new DANMessage<T>(); if (GlobalCollection.TryTake(out msg)) { return msg; } return null; } /// <summary> /// 獲取全部 /// </summary> /// <returns></returns> public static List<DANMessage<T>> TryTakeAll() { var list = new List<DANMessage<T>>(); while (true) { var q = TryTake(); if (q == null) { return list; } list.Add(q); } } /// <summary> /// 統計 /// </summary> public static int Count() { return GlobalCollection.Count; } }

測試業務Demo

 
/// <summary> /// 用戶 /// </summary> public class User { public string Mobile { get; set; } public long CompanyId { get; set; } } /// <summary> /// 倉儲 /// </summary> public class Repository<TDocument> : IRepository<TDocument> { public bool Update(User user) { DANQueue<User>.TryAdd(new DANMessage<User>() { Body = user, Key = user.CompanyId + user.Mobile, Type = typeof(User).Name, TimeStamp = DateTime.Now.Ticks }); return true; } }

分別測試批量更新數據下循環通知和只通知一次耗時,代碼以下:

 
public const string ExchangeStr = "fanTest"; public const string QueueStr = "fanQueueTest"; private static string TypeUserName = typeof(User).Name; static void Main(string[] args) { //這裏就不引入依賴注入了。 Repository<User> repository = new Repository<User>(); Stopwatch stopwatch = new Stopwatch(); stopwatch.Start(); for (var i = 0; i <= 1000; i++) { var user = new User() { CompanyId = 13232, Mobile = "11111" + i }; repository.Update(user); RabbitProvider.Publish(ExchangeStr, QueueStr, TypeUserName, DANQueue<User>.TryTake()); } stopwatch.Stop(); Console.WriteLine($"100000UpdateWithPush-Time:" + stopwatch.ElapsedMilliseconds); //批量測試。 stopwatch.Restart(); for (var i = 0; i <= 1000; i++) { var user = new User() { CompanyId = 13232, Mobile = "11111" + i }; repository.Update(user); } RabbitProvider.Publish(ExchangeStr, QueueStr, TypeUserName, DANQueue<User>.TryTakeAll()); stopwatch.Stop(); Console.WriteLine($"100000UpdateDelayPush-Time:" + stopwatch.ElapsedMilliseconds); Console.ReadLine(); } }

結果以下:

 
UpdateWithPush-Time:4103 UpdateDelayPush-Time:73

這裏列舉的只是1000條,當我改爲1萬條的時候,隊列掛了!這充分說明了延時聚合通知的重要性。相同的環境下,循環通知支撐不了1萬,但聚合後只通知一次的狀況下,10萬數據也花了9秒。雙方性能對比結果是指數級的。

 
UpdateDelayPush-Time:9671

引入定時機制

上面已經對比了循環通知和聚合通知的性能,但普通的聚合十分侵入業務。每種類型的業務都須要引入代碼,使用不方便,並且維護起來也麻煩。這時候能夠考慮引入定時任務來處理聚合通知。先來個1百萬的更新。

 
System.Timers.Timer timer = new System.Timers.Timer(5000); timer.Elapsed += Timer_Elapsed; timer.Start(); //批量測試大量數據 for (var i = 0; i <= 1000000; i++) { var user = new User() { CompanyId = 13232, Mobile = "11111" + i }; repository.Update(user); }

定時觸發的方法以下:

 
private static void Timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) { var list = DANQueue<User>.TryTakeAll(); if (list.Count > 0) { RabbitProvider.Publish(ExchangeStr, QueueStr, TypeUserName, list); } }

運行Debug測試,爲方便顯示,我減小了一些列,只顯示queue名和發佈速度,能達到每秒1萬左右的量。

 
| test | [fanQueueTest](/#/queues/test/fanQueueTest) | 10,569/s | | test | [fanQueueTest](/#/queues/test/fanQueueTest) | 12,336/s |

談到此時的推送速度,再來回顧下剛開始循環通知的速度,每秒250左右,可見速度提高了50倍!

 
| test | [fanQueueTest](/#/queues/test/fanQueueTest) | 249/s |

源碼

DAN : DelayAggregationNotice 延時聚合通知組件

這裏推薦一下個人Java後端技術羣:834962734 ,羣裏有(分佈式架構、高可擴展、高性能、高併發、性能優化、Spring boot、Redis、ActiveMQ、等學習資源)進羣免費送給每一位Java小夥伴,無論你是轉行,仍是工做中想提高本身能力均可以,歡迎進羣一塊兒深刻交流學習!

總結

通過以上對比,性能從幾千就掛到支撐到每秒上萬的推送量,而且支撐百萬(更高級別沒測試)以上級更新依然健壯運行。

結果如此明顯,若是尚未動力改變,那還有什麼能拯救你呢?

這裏的Timer之後能夠替換成hangfire,由於hangfire有UI監控,能夠查看狀態。hangfire貌似不推薦大數據量的參數,這些細節問題之後能夠根據測試狀況去取捨。

以上僅爲了測試,若是要變成通用可複用,還有更長的路須要走,但比起分佈式追蹤簡單多了,一步一步來,用目標約束本身慢慢實現。

本篇完畢,謝謝觀看。

相關文章
相關標籤/搜索