隨着公司的快速發展,天天推送的消息量不斷增長。以前的舊消息系統已經日益不能知足現階段推送場景的功能要求,咱們就開始從0到1構建一個完整的消息系統。java
原始架構中存在各類各樣的痛點與挑戰:redis
業務接入比較慢,發送不一樣類型的消息須要接入不一樣的api. 消息消費處理都很慢,影響活動運營體驗。mongodb
業務調用量統計不清楚,沒法針對不一樣業務進行關閉/限流。api
缺少優先級消息,在營銷類消息大批量推送時,正常的訂單相關的消息就會堆積,阻塞會被延時。安全
經過下列4個方面進行優化:架構
切換數據源存儲,選擇寫比es相對高一點和讀相對es少一點的mongodb
優化
消息平臺目前總體架構*
spa
目前消息平臺的優先級採用2種發送實現的,首先使用傳統的消息隊列kafka進行一次優先級發送和消費,後面採用優先線程池任務進行優先消息發送。線程
kafka自己並不支持優先級,咱們經過下列2個方案進行處理,人爲的對kafka不一樣隊列發送不一樣優先級消息。接口
採用建立不一樣的topic,不一樣優先級消息發送到不一樣的topic中,同時在消息消費的時候,按照不一樣的比例獲取不一樣topic的數據進行消費。
目前的順序是優先級最高的是第二高的2倍,依次進行下去。最後剩餘的拉取消息值加入到優先級最高的裏面 好比一次拉取50條,3個topic 那麼咱們就是按照 (25 +5/ 13 / 7) 進行拉取。
高優先級消息沒有了如何讓低優先級的消息滿負載拉取,即按照上述優先級最低的消息一次性拉取50條消息呢?
引入消息拉取狀態機,優先級消息比較低的時候,加大低優先級的消費。目前消息服務狀態機,有初始化、低負載、高負載等幾個狀態,經過判斷上一次處理的消息條數來肯定消息消費者當前的狀態並進行拉取參數的修改,目前採用反射的方式修改kafka的拉取數量。
爲了加快速度發送咱們也採用了本地線程池,本地線程任務,咱們採用任務優先級隊列。下面是提交一個線程任務的流程。
在上圖中,咱們經過給一個線程任務一個自增的序列號以及以前定義的優先級值進行比較,惟一肯定一條任務的執行優先級。
首先咱們定義延時/定時策略有一下幾個策略:
咱們將延時定時的消息區分上述3種類型以後,分別有不一樣的實現方式。在低於15s的時候咱們直接採用了java自帶的delayTask進行消息判斷&推送。而高於15s低於30分的,咱們本身建立了一個秒基本級別的單時間輪,進行消息推送,下述是時間輪的執行。
可是咱們在這個基礎上進行了部分優化,參考了kafka的延時隊列,當時間輪中沒有須要執行的任務以後,咱們直接對執行的線程進行wait等待,直到下個任務提交notify喚醒。對於高於30分鐘的延時任務,咱們通常先進行消息任務的存儲,在任務快要執行的30分鐘以前,咱們將任務數據加入到秒級別的時間輪中,參考第二種進行消息發送。(爲啥不用天/小時級別時間輪,純粹是不想浪費內存)
咱們在消息推送過程當中,用戶的防疲勞是必要的,目前消息中心的防疲勞場景主要有如下幾種類型:
咱們在上述幾種場景,主要採用的是mongo進行數據的存儲和聚合查詢,由於若是使用redis場景在多用戶的時候,會頻繁的操做redis,並非很好。且大部分防疲勞的數據咱們也僅僅最多保留1周,mongo的集合,很容易把咱們這些功能知足。固然在某個具體的業務1天內只能收到1條消息這個場景中,咱們採用了redis的helperLogLog進行防疲勞,減小查詢和內存消耗。雖然有一點的偏差,可是咱們的使用場景上影響不是很高。
最後咱們從本身的實戰總結下來構建一個消息平臺須要思考的點:
關注得物技術,攜手走向技術的雲端