個推基於 Apache Pulsar 的優先級隊列方案


做者:個推平臺研發工程師 祥子前端

1、業務背景
在個推的推送場景中,消息隊列在整個系統中佔有很是重要的位置。

當 APP 有推送需求的時候, 會向個推發送一條推送命令,接到推送需求後,咱們會把APP要求推送消息的用戶放入下發隊列中,進行消息下發;當同時有多個APP進行消息下發時,不免會出現資源競爭的狀況, 所以就產生了優先級隊列的需求,在下發資源固定的狀況下, 高優先級的用戶須要有更多的下發資源。
2、基於 Kafka 的優先級隊列方案
針對以上場景,個推基於 Kafka 設計了初版的優先級隊列方案。Kafka 是 LinkedIn 開發的一個高性能、分佈式消息系統;Kafka 在個推有很是普遍的應用,如日誌收集、在線和離線消息分發等。java

架構

在該方案中,個推將優先級統一設定爲高、中、低三個級別。具體操做方案以下:git

1. 對某個優先級根據 task (單次推送任務)維度,存入不一樣的 Topic,一個 task 只寫入一個 Topic,一個 Topic 可存多個 task;github

2. 消費模塊根據優先級配額(如 6:3:1),獲取不一樣優先級的消息數,同一優先級輪詢獲取消息;這樣既保證了高優先級用戶能夠更快地發送消息,又避免了低優先級用戶出現沒有下發的狀況。apache

Kafka 方案遇到的問題api

隨着個推業務的不斷髮展,接入的 APP 數量逐漸增多,初版的優先級方案也逐漸暴露出一些問題:緩存

1. 當相同優先級的 APP 在同一時刻推送任務愈來愈多時,後面進入的 task 消息會由於前面 task 消息還存在隊列狀況而出現延遲。以下圖所示, 當 task1 消息量過大時,在task1 消費結束前,taskN 將一直處於等待狀態。

2. Kafka 在 Topic 數量由 64 增加到 256 時,吞吐量降低嚴重,Kafka 的每一個 Topic、每一個分區都會對應一個物理文件。當 Topic 數量增長時,消息分散的落盤策略會致使磁盤 IO 競爭激烈,所以咱們不能僅經過增長 Topic 數量來緩解第一點中的問題。服務器

基於上述問題,個推動行了新一輪的技術選型, 咱們須要能夠建立大量的 Topic, 同時吞吐性能不能比 Kafka 遜色。通過一段時間的調研,Apache Pulsar 引發了咱們的關注。架構

3、爲何是 Pulsar
Apache Pulsar 是一個企業級的分佈式消息系統,最初由 Yahoo 開發,在 2016 年開源,並於2018年9月畢業成爲 Apache 基金會的頂級項目。Pulsar 已經在 Yahoo 的生產環境使用了三年多,主要服務於Mail、Finance、Sports、 Flickr、 the Gemini Ads platform、 Sherpa (Yahoo 的 KV 存儲)。併發

架構

Topic 數量
Pulsar 能夠支持百萬級別 Topic 數量的擴展,同時還能一直保持良好的性能。Topic 的伸縮性取決於它的內部組織和存儲方式。Pulsar 的數據保存在 bookie (BookKeeper 服務器)上,處於寫狀態的不一樣 Topic 的消息,在內存中排序,最終聚合保存到大文件中,在 Bookie 中須要更少的文件句柄。另外一方面 Bookie 的 IO 更少依賴於文件系統的 Pagecache,Pulsar 也所以可以支持大量的主題。

消費模型
Pulsar 支持三種消費模型:Exclusive、Shared 和Failover。


  • Exclusive (獨享):一個 Topic 只能被一個消費者消費。Pulsar 默認使用這種模式。
  • Shared(共享):共享模式,多個消費者能夠鏈接到同一個 Topic,消息依次分發給消費者。當一個消費者宕機或者主動斷開鏈接時,那麼分發給這個消費者的未確認(ack)的消息會獲得從新調度,分發給其餘消費者。
  • Failover (災備):一個訂閱同時只有一個消費者,能夠有多個備份消費者。一旦主消費者故障,則備份消費者接管。不會出現同時有兩個活躍的消費者。

Exclusive和Failover訂閱,僅容許一個消費者來使用和消費每一個訂閱的Topic。這兩種模式都按 Topic 分區順序使用消息。它們最適用於須要嚴格消息順序的流(Stream)用例。


Shared 容許每一個主題分區有多個消費者。同一個訂閱中的每一個消費者僅接收Topic分區的一部分消息。Shared最適用於不須要保證消息順序隊列(Queue)的使用模式,而且能夠按照須要任意擴展消費者的數量。

存儲
Pulsar 引入了 Apache BookKeeper 做爲存儲層,BookKeeper 是一個專門爲實時系統優化過的分佈式存儲系統,具備可擴展、高可用、低延遲等特性。具體介紹,請參考 [BookKeeper官網](https://github.com/apache/bookkeeper)。

Segment
BookKeeper以 Segment (在 BookKeeper 內部被稱做 ledger) 做爲存儲的基本單元。從 Segment 到消息粒度,都會均勻分散到 BookKeeper 的集羣中。這種機制保證了數據和服務均勻分散在 BookKeeper 集羣中。

Pulsar 和 Kafka 都是基於 partition 的邏輯概念來作 Topic 存儲的。最根本的不一樣是,Kafka 的物理存儲是以 partition 爲單位的,每一個 partition 必須做爲一個總體(一個目錄)存儲在某個 broker 上。 而 Pulsar 的 partition 是以 segment 做爲物理存儲的單位,每一個 partition 會再被打散並均勻分散到多個 bookie 節點中。

這樣的直接影響是,Kafka 的 partition 的大小,受制於單臺 broker 的存儲;而 Pulsar 的 partition 則能夠利用整個集羣的存儲容量。

擴容
當 partition 的容量達到上限後,須要擴容的時候,若是現有的單臺機器不能知足,Kafka 可能須要添加新的存儲節點,並將 partition 的數據在節點之間搬移達到 rebalance 的狀態。

而 Pulsar 只需添加新的 Bookie 存儲節點便可。新加入的節點因爲剩餘空間大,會被優先使用,接收更多的新數據;整個擴容過程不涉及任何已有數據的拷貝和搬移。

Broker 故障
Pulsar 在單個節點失敗時也會體現一樣的優點。若是 Pulsar 的某個服務節點 broker 失效,因爲 broker 是無狀態的,其餘的 broker 能夠很快接管 Topic,不會涉及 Topic 數據的拷貝;若是存儲節點 Bookie 失效,在集羣后臺中,其餘的 Bookie 會從多個 Bookie 節點中併發讀取數據,並對失效節點的數據自動進行恢復,對前端服務不會形成影響。

Bookie 故
Apache BookKeeper 中的副本修復是 Segment (甚至是 Entry)級別的多對多快速修復。這種方式只會複製必須的數據,這比從新複製整個主題分區要精細。以下圖所示,當錯誤發生時, Apache BookKeeper 能夠從 bookie 3 和 bookie 4 中讀取 Segment 4 中的消息,並在 bookie 1 處修復 Segment 4。全部的副本修復都在後臺進行,對 Broker 和應用透明。

當某個 Bookie 節點出錯時,BookKeeper會自動添加可用的新 Bookie 來替換失敗的 Bookie,出錯的 Bookie 中的數據在後臺恢復,全部 Broker 的寫入不會被打斷,並且不會犧牲主題分區的可用性。

4、基於 Pulsar 的優先級隊列方案

在設計思路上,Pulsar 方案和 Kafka 方案並無多大區別。但在新方案中,個推技術團隊藉助 Pulsar 的特性,解決了 Kafka 方案中存在的問題。
1. 根據 task 動態生成 Topic,保證了後進入的 task 不會由於其餘 task 消息堆積而形成等待狀況。
2. 中高優先級 task 都獨享一個 Topic,低優先級 task 共享 n 個 Topic。
3. 相同優先級內,各個 task 輪詢讀取消息,配額滿後流轉至下一個優先級。
4. 相同優先級內, 各個 task 可動態調整 quota, 在相同機會內,可讀取更多消息。
5. 利用 Shared 模式, 能夠動態添加刪除 consumer,且不會觸發 Rebalance 狀況。
6. 利用 BookKeeper 特性,能夠更靈活的添加存儲資源。

 5、Pulsar 其餘實踐
1. 不一樣 subscription 之間相對獨立,若是想要重複消費某個 Topic 的消息,須要使用不一樣的 subscriptionName 訂閱;可是一直增長新的 subscriptionName,backlog 會不斷累積。
2. 若是 Topic 無人訂閱,發給它的消息默認會被刪除。所以若是 producer 先發送,consumer 後接收,必定要確保 producer 發送以前,Topic 有 subscription 存在(哪怕 subscribe 以後 close 掉),不然這段時間發送的消息會致使無人處理。
3. 若是既沒有人發送消息,又沒有人訂閱消息,一段時間後 Topic 會自動刪除。
4. Pulsar 的 TTL 等設置,是針對整個 namespace 起效的,沒法針對單個 Topic。
5. Pulsar 的鍵都創建在 zookeeper 的根目錄上,在初始化時建議增長總節點名。
6. 目前 Pulsar 的 java api 設計,消息默認須要顯式確認,這一點跟 Kafka 不同。
7. Pulsar dashboard 上的 storage size 和 prometheus 上的 storage size (包含副本大小)概念不同。
8. 把`dbStorage_rocksDB_blockCacheSize` 設置的足夠大;當消息體量大,出現backlog 大量堆積時, 使用默認大小(256M)會出現讀耗時過大狀況,致使消費變慢。
9. 使用多 partition,提升吞吐。
10. 在系統出現異常時,主動抓取 stats 和 stats-internal,裏面有不少有用數據。
11. 若是業務中會出現單 Topic 體量過大的狀況,建議把 `backlogQuotaDefaultLimitGB` 設置的足夠大(默認10G), 避免由於默認使用`producer_request_hold` 模式出現 block producer 的狀況;固然能夠根據實際業務選擇合適的 `backlogQuotaDefaultRetentionPolicy`。
12. 根據實際業務場景主動選擇 backlog quota。
13. prometheus 內若是發現讀耗時爲空狀況,多是由於直接讀取了緩存數據;Pulsar 在讀取消息時會先讀取 write cache, 而後讀取 read cache;若是都沒有命中, 則會在 RocksDB 中讀取條目位子後,再從日誌文件中讀取該條目。
14. 寫入消息時, Pulsar 會同步寫入 journal 和 write cache;write cache 再異步寫入日誌文件和 RocksDB; 因此有資源的話,建議 journal 盤使用SSD。

6、總結

如今, 個推針對優先級中間件的改造方案已經在部分現網業務中試運行,對於 Pulsar 的穩定性,咱們還在持續關注中。
做爲一個2016 年纔開源的項目,Pulsar 擁有很是多吸引人的特性,也彌補了其餘競品的短板,例如跨地域複製、多租戶、擴展性、讀寫隔離等。儘管在業內使用尚不普遍, 但從現有的特性來講, Pulsar 表現出了取代 Kafka 的趨勢。在使用 Pulsar 過程當中,咱們也遇到了一些問題, 在此特別感謝翟佳和郭斯傑(兩位均爲 Stream Native 的核心工程師、開源項目 Apache Pulsar 的 PMC 成員)給咱們提供的支持和幫助。


參考文獻:

[1] 比拼 Kafka, 大數據分析新秀Pulsar 到底好在哪(https://www.infoq.cn/article/1UaxFKWUhUKTY1t_5gPq)

[2] 開源實時數據處理系統Pulsar:一套搞定Kafka+Flink+DB(https://juejin.im/post/5af414365188256717765441)

相關文章
相關標籤/搜索