理解Apache Pulsar工作原理

理解Apache Pulsar工作原理

原創:Jack Vanlightly
原文鏈接:https://jack-vanlightly.com/blog/2018/10/2/understanding-how-apache-pulsar-works

在這篇文章中,我們將介紹Apache Pulsar的設計,這篇文章不適合想要了解如何使用Apache Pulsar的讀者,適合想要了解Apache Pulsar是如何工作的讀者。
設計核心

  • 保證不丟失消息(使用正確的配置且不是整個數據中心故障)
  • 強順序性保證
  • 可預測的讀寫延遲

Apache Pulsar選擇一致性而不是可用性就像Bookkeeper和Zookeeper一樣。Apache Pulsar盡一切努力保持一致性。

這篇文章中不會介紹跨機房複製相關的內容,我們只關注一個集羣。

多層抽象

Apache Pulsar在上層具有高級別的Topic(主題)和Subscription(訂閱)的概念,在底層數據存儲在二進制文件中,這些數據交叉分佈在多個服務器上的多個Topic。在其中包含很多的細節部分。我個人認爲把它分成不同的抽象層更容易理解Apache Pulsar的架構設計,所以這就是我在這篇文章中要做的事情。

接下來我們一層一層的進行分析
圖1.分層抽象
圖1.分層抽象

第一層 - Topic、Subscription和Cursors

我們將要簡要介紹Topic(主題)、Subsription(訂閱)和Cursors(遊標)的基本概念,不會包含深層次的消息傳遞方式。
圖2.Topic和Subscriptions
圖2.Topic和Subscription
消息存儲在Topic中。邏輯上一個Topic是日誌結構,每個消息都在這個日誌結構中有一個偏移量。Apache Pulsar使用遊標來跟蹤偏移量。生產者將消息發送到一個指定的Topic,Apache Pulsar保證消息一旦被確認就不會丟失(正確的配置和非整個集羣故障的情況下)。

消費者通過訂閱來消費Topic中的消息。訂閱是遊標(跟蹤偏移量)的邏輯實體,並且還根據不同的訂閱類型提供一些額外的保證

  • Exclusive(獨享) - 一個訂閱只能有一個消息者消費消息
  • Shared(共享) - 一個訂閱中同時可以有多個消費者,多個消費者共享Topic中的消息
  • Fail-Over(災備) - 一個訂閱同時只有一個消費者,可以有多個備份消費者。一旦主消費者故障則備份消費者接管。不會出現同時有兩個活躍的消費者。

一個Topic可以添加多個訂閱。訂閱不包含消息的數據,只包含元數據和遊標。

Apache Pulsar通過允許消費者將Topic看做在消費者消費確認後刪除消息的隊列,或者消費者可以根據遊標的回放來提供隊列和日誌的語義。在底層都使用日誌作爲存儲模型。

如果沒有對Topic設置數據保留策略(目前通過其命名空間,後面會提供Topic級別的設置),一旦一個Topic的所有訂閱的遊標都已經成功消費到一個偏移量時,此偏移量前面的消息就會被自動刪除。也就是說需要該Topic的所有訂閱上得到消費確認。

但是,如果Topic設置了數據保留策略,已經消費確認的消息超過保留策略閾值(Topic的消息存儲大小、Topic中消息保留的時間)後會被刪除。

消費者可以以單條或者累積的方式確認消息。累積確認會有更好的吞吐量,但是在消息消費失敗後會引入重複的消息處理。注意,累積消費不適用於共享模式的訂閱,因爲累積確認是基於偏移量的。但是在客戶端API中支持批量確認,這樣會減少RPC調用次數來提高在共享模式下訂閱競爭消費的吞吐量。

最後,有一些類似於kafka Topic的分區(Partition)。區別在於Apache Pulsar中的分區也是Topic。就像kafka一樣,生產者可以輪詢、hash或者明確指定分區來發送消息。

以上都是對上層概念的一些介紹,下面我們將進行深入的研究。

第二層 - 邏輯存儲模型

現在該介紹Apache Bookkeeper了。我將在Apache Pulsar的背景下討論Bookkeeper,儘管Bookkeeper是一個通用的分佈式日誌存儲解決方案。

首先,Bookkeeper將數據存儲至集羣中的節點上,每個Bookkeeper節點稱爲Bookie。其次,Pulsar和Bookkeeper都使用Apache Zookeeper來存儲元數據和監控節點健康狀況。
圖3.Apache Pulsar、Bookkeeper和Zookeeper
圖3.Apache Pulsar、Bookkeeper和Zookeeper

一個Topic實際上是一個ledgers流。Ledger本身就是一個日誌。所以一系列的子日誌(Ledgers)組成了一個父日誌(Topic)。

Ledgers追加到一個Topic,條目(消息或者一組消息)追加到Ledgers。Ledger一旦關閉是不可變的。Ledger作爲最小的刪除單元,也就是說我們不能刪除單個條目而是去刪除整個Ledger。

Ledgers本身也被分解爲多個Fragment。Fragment是Bookkeeper集羣中最小的分佈單元

圖4.條目存儲在底層
圖4.條目存儲在底層

Topic是Pulsar中的概念。Ledger和Fragment是Bookkeeper中的概念,儘管Pulsar知道且使用Ledgers和Fragment。

每個Ledger(由一個或多個Fragment組成)可以跨多個Bookkeeper節點(Bookies)進行復制,以實現數據容災和提升讀取性能。每個Fragment都在一組不同的Bookies中複製(存在足夠的Bookies)。
圖5.Apache Pulsar、Bookkeeper和Zookeeper協同工作
圖5.Apache Pulsar、Bookkeeper和Zookeeper協同工作

每個Ledger有三個關鍵配置:

Ensemble Size (E)
Write Quorum Size (Qw)
Ack Quorum Size (Qa)

這些配置可以應用到Topic級別,然後pulsar會在Topic使用的Bookkeeper Ledgers/Fragments上設置。

注意:Ensemble表示將要寫入的實際的Bookies數量,以下用E表示。E表示Pulsar需要使用的Bookies數量。請注意,在配置時您至少需要E個bookies才能正常的使用。默認情況下,從可用的bookies列表中隨機選取E個bookies(每個bookie在Zookeeper中註冊自己)。

通過將Bookies標記爲屬於特定機架,還可以選擇配置機架感知。機架可以是邏輯結構(例如:雲環境中的可用區域)。通過機架感知策略,Pulsar Broker的Bookkeeper客戶端將嘗試從不同的機架選擇Bookies。也可以自定義策略是定製化Bookies的選擇。

Ensemble Size (E) 決定了Pulsar寫入Ledger可用的Bookies池的大小。每個Fragment可能有不同的Bookies列表,Broker將在創建Fragment時選擇一組Bookies,E的數量是一致的。必有足夠的Bookies數量(> E)。

Write Quorum (Qw) 是Pulsar將要寫入的實際的Bookies數量。可以等於或者小於E。

圖6.E = 3 Qw = 3
圖6.E = 3 Qw = 3

當Qw小於E時,以條帶化的方式分配讀/寫即每個Bookie只提供讀寫請求的子集。因此可以提升吞吐量,將次延遲。

圖7.條帶化
圖7.條帶化

Ack Quorum (Qa) 是確認寫入Bookies的數量,Pulsar Broker將確認發送給客戶端。爲了一致性,Qa應該是:(Qw + 1) / 2 或者更大。

在實踐中:
(Qa == Qw) 或
(Qa == Qw - 1) —> 這樣避免單節點響應緩慢而改善寫入延遲。

最終,每個Bookie必須都接受寫入。如果我們總是等待所有Bookies做出響應,我們可能因爲因爲單個Bookie響應緩慢帶來的整體延遲上升。Pulsar畢竟承諾有可預測的延遲。

當創建一個新的Topic或者Ledger滾動時會創建一個新的Ledger。Ledger在以下這些情況會發生滾動並創建新的Ledger:

  • 已達到Ledger的大小或時間限制。
  • Ledger的所有權(Pulsar Broker的所有權)發生變化(稍後會詳細介紹)。

以下情況會創建新的Fragment:

  • 創建新的Ledger。
  • 當前Fragment使用Bookies發生寫入錯誤或超時。

當一個Bookies無法服務寫入操作時,Pulsar Broker會創建一個新的Fragment,並確保寫入的Bookies數量達到Qw(Write Quorum)值,會不斷的重試直到消息被持久化。

通過前面的介紹我們可以得到以下認識:

  1. 增加E以優化延遲和吞吐量。增加Qw犧牲吞吐量實現冗餘。增加Qa提升數據的容災但會增加延遲和單一節點響應慢導致的延遲增加。
  2. E和Qw不是Bookies的列表。它們支持表明可爲給定的Ledger服務的Bookies池有多大。Pulsar將在創建新的Ledger或Fragment時使用E和Qw。每個Fragment都有一組固定的Bookies且不可變。
  3. 添加一個新的Bookies不意味着需要手動Rebalance。這些新的Bookies將自動成爲Fragment的候選人。加入集羣后,將在創建新的Fragment/Ledger後立即寫入新的Bookies。每個Fragment都可以存儲在不同的Bookies的子集中!我們不需要明確將Topic或Ledger分配指定的Bookies。

我們停下來總結一下。相對於kafka,這是一個非常不同且複雜的模型。對於kafka,每個Partition副本都完整的存儲在kafka節點上。Partition以及Partition副本由一系列的Segment和索引文件組成。

kafka模型的有點在於簡單快捷。所有讀寫都是順序的。不好的是,單個節點必須有足夠的磁盤空間來處理副本,因此非常大的副本可能會迫使你是用非常大的磁盤。第二個缺點是,在集羣擴展時必須做Rebalance。這個過程是比較痛苦的,需要良好的計劃和執行來保證沒有任何故障的情況下分散節點的存儲壓力。

回到Pulsar + Bookkeeper模型。Topic中的數據分佈在多個Bookies上。Topic被分割成Ledgers,Ledgers被分割成Fragments分佈在Fragment使用的Bookies上。當需要做集羣擴展時,只需添加更多Bookies,它們就會在創建新的Fragment時開始在的Bookies上寫入數據,不再需要kafka的Rebalance操作。但是,讀取和寫入現在在Bookies之間跳躍。我們很快將看到Pulsar是如何管理的。

但現在每個Pulsar Broker都需要跟蹤每個Topic所包含的Ledgers和Fragments。這個元數據存儲在Zookeeper中,如果丟失了將會遇到非常嚴重的問題。

在存儲層中,我們往Bookkeeper集羣中均勻的寫入一個Topic的數據。我們避免了將Topic或者副本的數據整體寫到一個特定節點的缺陷,這避免了痛苦的Rebalance。

第二層 - Pulsar Broker 和 Topic 所有權

同樣在第二層抽象層,我們有Pulsar Brokers。Pulsar Broker是無狀態的,沒有不能丟失的持久化狀態。它們與存儲層分開。Bookeeper集羣本身並不執行復制,每個Bookies只是一個跟隨者被領導者人知做什麼,領導人是Pulsar Broker。每個Topic都由一個Pulsar Broker擁有,該Broker提供Topic的所有讀寫操作。

當Pulsar Broker接收到寫入請求時,它會對當前Topic的當前Fragment使用的Bookies執行寫入。(這一段需要確認一下)。

在通常情況下,當前的Ledger中將有一個Fragment。一旦Broker承認寫入(滿足Qa),Pulsar Broker將向生產者客戶端發送確認。

只有在所有之前消息都已經通過確認時(滿足Qa)才能發送確認。如果對於給定的消息,Bookie響應錯誤或者根本沒有響應,則Broker將在新的Bookies上創建新的Fragment(不包含有問題的Bookie)。

圖8.一個Topic在一個Broker上的讀取和寫入
圖8.一個Topic在一個Broker上的讀取和寫入

請注意,Broker只會等待Bookies的Qa確認。

讀取也是通過擁有此Topic的Broker。作爲給定Topic的單一入口點,Broker知道那些偏移量已經安全的保存到Bookkeeper中。它只需要從一個Bookie讀取即可進行讀取。我們將在第3層中看到它如何利用緩存從其內存緩存中提供讀操作而不需要將讀取發送到Bookkeeper。

圖9.只需要對一個Bookie進行讀取
圖9.只需要對一個Bookie進行讀取

Pulsar Broker的健康狀態由Zookeeper監控。當Broker不可用時(Zookeeper認爲),會發生所有權變更。新的Broker成爲Topic的所有者,所有客戶端連接都會被重定向到此Broker。此Topic的讀寫將由新的所有者提供服務。

Bookkeeper有一個非常重要的功能成爲Fencing。Fencing保證了Bookkeeper只有一個編寫器(Pulsar Broker)可以寫入Ledger。

工作原理如下:

  1. Topic X 的當前擁有者(B1)不可用(通過Zookeeper)。
  2. 其他Broker(B2)將Topic X 的當前Ledger狀態從OPEN修改爲IN_RECOVERY。
  3. B2向Ledger的當前Fragment的Bookies發送fence信息並等待(Qw-Qa) + 1個Bookies響應。收到此響應數後Ledger將變成fenced。如果舊的Broker仍然處於活躍狀態則無法再進行寫入,因爲無法獲得Qa確認(由於fencing導致異常響應)。
  4. B2然後從Fragment的Bookies獲得他們最後確認的條目是什麼。它需要最新條目的ID,然後從該點開始向前讀。它確保從哪一點開始的所有條數(可能以前未向Pulsar Broker承認)都會被複制到Qw Bookies。一旦B2無法讀取並複製任何條目,Ledger將完全恢復。
  5. B2將Ledger的狀態更改爲CLOSED。
  6. B2現在可以創建新的Ledger並接受寫入請求。

這種架構的偉大之處在於,通過讓領導人(Pulsar Broker)沒有狀態,Bookkeeper的fencing特性可以很好的處理腦裂問題。沒有腦裂,沒有分歧,沒有數據丟失。

第二層 - Cursor跟蹤

每個Subscription都存儲一個Cursor。Cursor是日誌中的當前偏移量。Subscription將其Cursor存儲至Bookkeeper的Ledger中。這使Cursor跟蹤可以像Topic一樣進行擴展。

第三層 - Bookie 存儲

Ledgers和Fragments是在Zookeeper中維護和跟蹤的邏輯結構。物理上數據不存儲在Ledgers和Fragments對應的文件中。Bookkeeper中存儲的實現是可拔插的,Pulsar默認使用名稱爲DbLedgerStorage的存儲實現。

當在Bookie上寫入數據時,首先將該消息寫入日誌文件,這是一個預寫日誌(WAL),它可以幫助Bookkeeper在發生故障時避免數據丟失。它與關係型數據庫持久化保證的機制相同。

寫入操作也會寫入緩存。寫入的緩存會在內存中做積累並定期進行排序和刷盤。對寫入進行排序以便將同一Ledger的條目放在一起,從而提高讀取性能。如果條目以嚴格的時間順序寫入,在讀取時無法利用磁盤的高效順序操作。通過聚合和排序,我們實現了Ledger級別的時間順序,這是我們關心的。

寫入緩存還將條目寫入RocksDB,存儲每個條目的位置索引。它只是將(LedgerId,EntryId)映射到(EntryLogId,文件中的偏移量)。

由於寫入緩存具有最新的消息,因此在讀取時嘗試讀取寫緩存,如果沒有命中嘗試讀取讀緩存。如果兩者都沒有命中,則會從RocksDB中查找條目的位置,然後在日誌文件中讀取該條目並且會更新到讀緩存中以便後續請求命中緩存。這兩層緩存意味着讀取通常可以在內存中完成。

Bookkeeper容許將磁盤IO做讀寫分離。寫入都按順序寫入日誌文件可以存儲在專用的磁盤上,並且可以批量刷盤以獲得搞得吞吐量。除此之外從寫入操作來看沒有其他的同步磁盤IO操作,數據都是寫入到內存的緩存區。

寫緩存通過異步的方式批量將條目寫入到日誌文件和RocksDB,因此,一個磁盤用於同步寫入日誌文件,另一個磁盤用於異步寫入條目和讀取操作,

在讀取的一邊,讀取操作由Read Cache或Log Entry文件和RocksDB提供。

還要考慮到寫入會佔滿入口網絡帶寬,讀取會佔滿出口網絡帶寬,但是他們不會相互影響。

優雅的實現了磁盤和網絡中讀和寫的隔離。
圖10.DbLedgerStorage(Apache Pulsar默認使用)架構的Bookie
圖10.DbLedgerStorage(Apache Pulsar默認使用)架構的Bookie

第三層 - Pulsar Broker 緩存

每個Topic都有一個所屬的Broker,所以讀寫都是通過該Broker進行的。這樣提供了很多的好處。

首先,Broker可以將日誌的尾部緩存在內存中,這意味着Broker可以不需要Bookkeeper的情況下爲讀取尾部數據的操作提供服務。這比避免了網絡的開銷以及Bookie上可能的磁盤讀取。

Broker也知道Last Add Confirmed條目的ID。這樣可以跟蹤那條消息是最後一個安全持久化的消息。

當Broker的緩存中沒有消息時將從消息所在的Fragment使用的一個Bookie請求數據。這樣可能必須承擔額外的網絡開銷和可能的磁盤讀取成本。

因此,我們從高層次描述了消息的邏輯和物理結構以及Pulsar集羣中的不同參與者以及它們的關係。有很多細節尚未涵蓋,我們會在其他文章中做更詳細的解析。

接下來我們將介紹Apache Pulsar集羣中如何確保在節點故障後消息可以得到充分的複製。

數據恢復協議

當一個Bookie故障時,所有的在這個Bookie的含有Fragments的Ledgers現在都處於複製中。恢復是重新複製Fragments的過程,以確保每個Ledgers維護的副本數達到Qw。

有兩種類型的恢復方法:手動或自動。兩者複製的協議是相同的,但自動恢復使用內置的失敗節點檢查機制來執行要執行的重複複製任務。手動過程需要手動干預。

我們將專注於自動恢復模式。

自動恢復可以在一組專用的服務器或者在Bookies上通過AutoRecoveryMain運行。其中一個自動恢復的過程被選爲Auditor。Autitor的作用是檢查不可用的Bookie然後做一下操作:

  1. 讀取Zookeeper上完整的Ledgers清單,找到託管在失敗的Bookie上的Ledger。
  2. 對於每個Ledger,它將在Zookeeper中的 /underreplicated znode節點上創建重新複製任務。

如果Auditor節點出現故障,則另一個節點將晉升爲Auditor。Auditor是AutoRecoveryMain進行中的一個線程。

AutoRecoveryMain進程還有運行復制任務的Worker線程。每個Worker都是監聽不同的znode節點以發現新的複製任務。

發現任務時會嘗試鎖定。如果無法鎖定就會執行後面的任務。

如果獲得鎖,那麼:

  1. 掃描Ledgers,查找不屬於當前Bookie的Fragment。
  2. 對於每個匹配的Fragment,它將另一個Bookie的數據複製到它自己的Bookie,用新的集合更新Zookeeper並將Fragment標識爲Fully Replicated。

如果Ledgers仍然存在副本數不足的Fragment,則釋放鎖。如果所有Fragment都已經Fully Replicated,則從/underreplicated刪除重複複製任務。

如果一個Fragment沒有結束條目ID,則複製任務將等待並再次檢查,如果Fragment仍然沒有結束條目ID,則它會在重新複製Fragment之前對分類賬進行fence操作(Bookkeeper Fencing)。

所以,使用自動恢復模式Pulsar集羣能夠完全複製詳細數據以確保每個Ledger有足夠的副本數。系統管理員要確保報數適量的Bookies。

Zookeeper

Pulsar和Bookeeper都需要Zookeeper。如果Pulsar節點失去所有的Zookeeper節點的可見性,那麼它將停止接受讀寫操作並重新啓動。這是一種保護措施避免集羣形成不一致的狀態。

這意味着如果Zookeeper發生故障,一切都變得不可用,Pulsar節點的緩存也將被清除。因此在恢復服務時所有的讀取操作都發送到Bookkeeper,理論上可能存在延遲峯值。

結束語

  • 每個Topic都有一個歸屬的Broker。
  • 每個Topic在邏輯上分解爲Ledgers、Fragments和Entries。
  • Fragments分佈在Bookie集羣中。Topic與Bookie並不耦合。
  • Fragments可以跨多個Bookies帶狀分佈。
  • 當Pulsar Broker不可用時,該Broker持有的Topic所有權將轉移至其他的Broker。Fencing機制避免了同一個Topic當前的Ledger同時有兩個所有者(Broker)。
  • 當Bookie不可用時,自動恢復(如果啓用)將自動進行數據重新複製到其他的Bookies。如果禁用,則可以手動啓動此過程。
  • Broker緩存尾部消息日誌,可以非常高效的爲尾部讀取操作提供服務。
  • Bookies使用Journal提供持久化保證。該日誌可用於故障恢復時恢復尚未寫入Entry Log文件的數據。
  • 所有Topic的的條目都保存在Entry Log文件中。查找索引保存在RocksDB中。
  • Bookies讀取邏輯如下:Write Cache -> Read Cache -> Log Entry Files(RocksDB 作爲索引)
  • Bookies可以通過單獨的磁盤做IO讀寫分離。
  • Zookeeper存儲Pulsar和Bookkeeper的所有元數據。如果Zookeeper不可用整個Pulsar將不可用。
  • 存儲可以單獨擴展。如果存儲是瓶頸,那麼只需要添加更多的Bookies,他們會自動承擔負載不需要Rebalance。

圖11.概念的總結 圖11.概念的總結