![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
導讀微信
Apache Pulsar 是 Apache 軟件基金會頂級項目,是下一代雲原生分佈式消息流平臺,集消息、存儲、輕量化函數式計算爲一體,採用計算與存儲分離架構設計,支持多租戶、持久化存儲、多機房跨區域數據複製,具備強一致性、高吞吐、低延時及高可擴展性等流數據存儲特性。架構
![](http://static.javashuo.com/static/loading.gif)
做者介紹
app
冉小龍編輯器
騰訊雲微服務產品中心研發工程師
Apache Pulsar Committer分佈式
Apache BookKeeper Contributoride
![](http://static.javashuo.com/static/loading.gif)
背景svg
Apache Pulsar 系列第一篇文章爲讀者們詳細解釋了 Pulsar 的消息保留和過時策略,本文是系列第二篇,主要從 Pulsar 設計的原理以及在 BookKeeper 中如何存儲作一個梳理。函數
在社區中,咱們常常能夠看到用戶有關 Backlog,storage size 和 retention 等策略的困惑,比較常見的一些問題,諸如:微服務
-
我沒有設置 Retention 策略,爲何經過 topics stats 能夠查看到 storage size 遠大於 backlog size? -
個人 msg backlog size 很小,可是 storage size 確一直在增加? …性能
![](http://static.javashuo.com/static/loading.gif)
Pulsar 的消息模型
首先,咱們先來看一下 Pulsar 的消息模型
![](http://static.javashuo.com/static/loading.gif)
如上圖所示,Pulsar 提供了最基本的 pub-sub 的處理模型。
Producer
首先 Producer 端生產消息,將消息以 append 的形式追加到 Topic 中,這裏具體分發到哪個 Topic 中,根據消息是否設置了 msg key 會有所不一樣。
-
設置了 msg key,消息會基於 key 作 hash,將消息分發到不一樣的 partitions 中 未設置 msg key,消息會以 round robin 的形式,分發到不一樣的 partitions 中
在消息分發的模型中,Pulsar 與 Kafka 相似。
Consumer
在 Consumer 以外,Pulsar 抽象了一層訂閱層,用於訂閱 Topic。經過訂閱層的抽象,Pulsar 能夠靈活的支持 Queue 和 Streaming 這兩種類型的消息隊列。每個 sub 均可以拿到這個 Topic 中全部數據的完整 copy,有點相似 Kafka 中的 consumer group。根據訂閱類型的不一樣,每個訂閱下面能夠有一個或者多個 Consumer 來接收消息。
目前,Pulsar 支持以下四種消息訂閱模型:
-
Exclusive -
Failover -
Shared Key_Shared
![](http://static.javashuo.com/static/loading.gif)
存儲模型
消息在每一個 Partition Topic 的分佈式日誌中只存儲一次
這就意味着,當 Producer 成功發送消息到 Topic 以後,這個消息只會在存儲層存儲一次,不管你有多少個 Subscription 訂閱到這個 Topic 中,實際上操做的都是同一份數據。基於這個基礎,咱們能夠看到 Apache Pulsar 從上到下的層級抽象概念以下圖所示:
![](http://static.javashuo.com/static/loading.gif)
首先第一層抽象是 Topic(Partition),用來存儲 Producer 追加的 messages 信息,Topic 之下對應的是一個個的 ledger,ledger 裏面又劃分爲一個個的分片,在一個個的分片中存儲了更小粒度的 ertries,entries 中存儲的是 【一條】或者 【一個 batch】 的消息。
-
Tips: 在 Pulsar 中,一個 batch 在 broker 端會被看成一條消息來處理,batch 解析的具體邏輯是在 consumer 端接收消息時候去操做的。 Node: 在 Bookkeeper 中,對數據操做的最小單元是按照 segment 這個粒度來進行操做的。
爲何須要作分層抽象呢?
在這裏最直白的解釋其實就是,爲了確保數據被在每個 bk 節點中打的足夠散,分佈的足夠均勻。這也是分層分片架構設計的好處之一。
Ack 機制
在 Pulsar 中支持了兩種 Ack 的機制,分別是單條 Ack 和批量 Ack。單條 Ack(AckIndividual)是指 Consumer 能夠根據消息的 messageID 來針對某一個特定的消息進行 Ack 操做;批量 Ack(AckCumulative)是指一次 Ack 多條消息。
訂閱機制
爲了更好的理解 Strorage Size 以及 Backlog, 咱們首先須要去了解 Pulsar 中的訂閱機制,以下圖所示:
![](http://static.javashuo.com/static/loading.gif)
當有消息積壓時,你能夠經過 clear-backlog 來清除積壓的消息。清除 backlog 中積壓的消息是相對危險的操做,因此係統會提示你,是否確認要刪除 backlog 中的消息, clear-backlog 提供了 -f(--force) 的參數來屏蔽該提示。
Producer 仍是按照追加的形式不斷往 Topic 中發送消息,Consumer 端會建立一個 Subscription 去訂閱這個 Topic,當成功訂閱時,會初始化一個 Cursor 指向具體的消息的位置,默認狀況下是 Latest。
Cursor 是用來存儲一個訂閱中消費的狀態信息
上圖中,咱們能夠看到該訂閱下面的 Topic 已經成功 Receive 而且 Ack 掉了 m4 這條消息。那麼包含 m4 在內的全部的消息狀態都會被標記爲可刪除的狀態。在 Pulsar 中,使用 MarkDeletePosition 來標記這個位置。以後的全部消息,表明這個訂閱尚未消費的消息。
隨着時間的推移,假設在 AckCumulative 的場景下,上述訂閱中的 Consumer 又消費了一些消息,目前 Cursor 的位置移動到了 m8 的位置,意味着 m8 以前的消息均可以進入刪除狀態。
![](http://static.javashuo.com/static/loading.gif)
假設是在 AckIndividual 的場景下,上述訂閱中的 Consumer 只消費了 m7 這條消息而且發送了 Ack 請求,m5, m6 這兩條消息仍然沒有被成功消費,那麼目前處於可刪除狀態的消息是 m4 以前的消息和 m7 這條消息。也就是說,在這種場景下,因爲使用單條 Ack 致使 Topic 中間出現了 Ack 的空洞。
Cursor = Offset + IndevidualDeletes, Ack 會觸發 Cursor 的移動,可是不會刪除任何消息
![](http://static.javashuo.com/static/loading.gif)
隨着時間的推移,在單條 Ack 的場景下,Ack 的空洞可能會本身消失,以下圖所示:
![](http://static.javashuo.com/static/loading.gif)
上面咱們描述了,單個訂閱在單條 Ack 和批量 Ack 混合的場景下,Topic 中 cursor 的移動狀況。假設目前有多個 Subscription 訂閱了這個 Topic,那麼每個 Subscription 均可以拿到這個 Topic 中數據的完整 Copy,也就是一個 Subscription 會在這個 Topic 中初始化一個新的 Cursor, 每個 Cursor 之間消費的進度是沒有交集、互不影響的,因此就可能出現下圖中的狀況:
![](http://static.javashuo.com/static/loading.gif)
在上圖中,針對該 Topic,有兩個訂閱:Subscription-1 和 Subscription-2。Subscription-1中的 Consumer 消費掉了 m4 以前的消息,Subscription-2 中的 Consumer 消費掉了 m8 以前的消息。而 m4-m8 之間的這四條消息,雖然被 Subscription-2 消費完成,可是 Subscription-1 尚未消費完成這部分數據,因此這部分消息還不能夠被刪除。目前處於可刪除狀態的消息是 m4 以前的消息,即這個 Topic 中消費進度最慢的那個 Subscription 所消費完成的消息。那麼這就會有一個問題,假設我目前 Subscription-1 掉線了,它的 Cursor 的位置一直沒有變化,這就會致使這個 Topic 中的數據一直處於不可刪除的狀態。
針對上述場景,Pulsar 引入了 TTL 的概念,即容許用戶設置 TTL 的時間,當消息到達 TTL 指定的閾值 Cursor 仍然沒有移動的話,那麼會觸發 TTL 的機制,將 Cursor 自動向後移到指定的位置。在這裏須要注意的一點是,咱們一直強調的是 TTL 會移動 Cursor 的位置,到目前爲止,咱們尚未提到消息刪除的概念,不要將兩者混淆了。TTL 會作的只是去移動 Cursor 的位置,不會有任何跟消息刪除的邏輯。
Backlog
爲了更好的表述 Topic 中沒有被消費的數據,Pulsar 引入了 Backlog 的概念來描述這一部分消息。Backlog 能夠分爲以下兩種形式:
-
Topic Backlog: 最慢的那個訂閱的 Backlog 的集合 Subscription Backlog: 指針對單個訂閱級別的沒有消費的數據的集合
以下圖所示:Backlog A 屬於 Topic Backlog;Backlog A 屬於 Subscription-1 Backlog;Backlog B 屬於 Subscription-2 的 Backlog。
![](http://static.javashuo.com/static/loading.gif)
隨着時間的推移,Backlog 的會不斷的變化,以下圖所示:
![](http://static.javashuo.com/static/loading.gif)
在這裏須要說明的一點是,這裏的 backlogSize 記錄的是帶 batch 的消息,也就是一個 batch 會被看成一條消息來進行處理。由於在 broker 端去解析整個 batch 會給 broker 帶來必定的負擔,同時浪費大量的 CPU 資源,因此,具體 batch 邏輯的解析放到了 Consumer 端來進行處理。因此 Backlog 本質上記錄的是上面咱們提到的 entries 的數量。
在 Pulsar 中,針對 Backlog 有兩個指標,具體以下:
-
msgBacklog: 記錄的是全部未被 Ack 的 entries 的集合 backlogSize:記錄的是全部沒有被 Ack 的消息的大小
Retention 機制
在 Apache Pulsar 中,使用了 BookKeeper 來做爲存儲層,容許用戶將消息持久化,爲了確保消息不會無限期的持久化下去,Pulsar 引入了 Retention 的機制,容許用戶來配置消息持久化的策略。默認狀況下,持久化的機制是關閉的,即消息被 Ack 以後,就會進入刪除的邏輯。
配置 Retention 策略時,有以下兩個參數能夠指定:
-
size:指持久化大小的閾值。0 表明不配置 Retention 大小策略,-1 表明設置的大小無限大 time:指持久化時間的閾值。0 表明不配置 Retention 時間策略,-1 表明時間無限大
在引入 Retention 策略以後,整個 Topic 表示的視圖以下所示,m0-m5 表明已經被全部訂閱確認的消息而且已經超過了 Retention 策略的閾值,即這些消息正在 準備刪除。注意,我這裏描述的是 【準備刪除】具體是否能夠被刪除,如今還不能肯定。
![](http://static.javashuo.com/static/loading.gif)
在最開始,咱們從最上層的 Topic 一步步抽象到了一條具體的 msg,(在這裏爲了方便描述,咱們忽略掉 batch 的概念,即一條 msg 等價於一個 entry)如今咱們再反過來把全部的概念都疊加回去。由於在 bk 中,容許操做的最小的單元是一個 segment,因此在具體的 msg(entry)級別,是沒辦法針對一條消息進行刪除的,刪除操做須要針對一個 segment 來進行操做。以下圖所示:
假設 m0-m3 屬於 segment3;m4-m7 屬於segment2;m8-m11 屬於 segment1。按照上圖的描述,m0-m5 的消息均可以進行刪除操做, 可是 segment 2 中包含了 m6, m7 並無達到 Retention 的閾值,因此 segment 目前還不能夠被刪除。
Storage Size
爲了更方便的表述當前消息佔用的存儲空間的大小,Pulsar 引入了 storageSize 來描述整個概念。以下圖所示:當 backlog B 與 storage Size 標識的消息相同時,backlogSize 等價於 storageSize。
![](http://static.javashuo.com/static/loading.gif)
當因爲引入單條 Ack,Retention 策略以及 Bookkeeper 基於 segment 刪除的設定,那麼頗有可能形成 Storage Size 大於 backlog Size 的場景,以下圖所示:
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
總結
消息在每一個 Partition Topic 的分佈式日誌中只會存儲一次
Cursor 是用來存儲一個訂閱下 Consumer 的消費狀態的
Cursor 等價於 offset(kafka)+ individualDeletes
Ack 會去更新 Topic 中 Cursor 的位置
當某條消息被全部訂閱者都 Ack 以後,這條消息進入【能夠被刪除】的狀態
全部沒有被確認的消息會一直保存在 Subscription backlog 中
TTL 能夠經過設定一個時間閾值來自動更新 Cursor 的位置
Retention 策略是用來操做那些被 Ack 以後的消息應該怎麼處理
消息的刪除是以 segment 爲單位的,而不是 entry。
往期
推薦
《生存仍是毀滅?一文帶你看懂 Pulsar 的消息保留和過時策略》
![](http://static.javashuo.com/static/loading.gif)
掃描下方二維碼關注本公衆號,
瞭解更多微服務、消息隊列的相關信息!
解鎖超多鵝廠周邊!
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
本文分享自微信公衆號 - 騰訊雲中間件(gh_6ea1bc2dd5fd)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。