開源實時數據處理系統Pulsar:一套搞定Kafka+Flink+DB


編輯| Debra前端

AI 前線導讀:實時數據處理在各個行業和領域中已經變得愈來愈關鍵。可是在實時數據棧中,消息、計算和存儲三個部分的分離,給方案的實現帶來了高複雜性,低可維護性,低效率等問題。

更多幹貨內容請關注微信公衆號「AI 前線」,(ID:ai-front)

本文整理自 Streamlio 核心創始人翟佳在 QCon2018 北京站的演講,在本次演講中,翟佳介紹了 Apache Pulsar 的架構、特性和其生態系統的組成,並展現了 Apache Pulsar 在消息、計算和存儲三個方面進行的協調、抽象和統一。java

  • Messaging:Pulsar 對 pub/sub 和 queue 兩種模式提供統一的支持,同時保證了一致性,高性能和易擴展性。git

  • Computing:Pulsar 內部的 Pulsar-Functions 提供了 Stream-native 的輕量級計算框架,保證了數據的即時流式處理。github

  • Storage:Pulsar 藉助 Apache BookKeeper 提供了以 segment 爲中心的存儲架構,保證了存儲的性能,持久性和彈性。apache

實時數據處理在剛剛興起的時候,通常企業會採用λ架構,維護兩套系統:一套用來處理實時的數據;另外一套用 batch 的方式處理歷史數據。兩套系統帶來了資源的冗餘佔用和維護的不便。微信

爲了消除冗餘,逐漸演化出κ架構,使用一套系統來知足實時數據處理和歷史數據處理的需求。網絡

無論是λ架構仍是κ架構,在實時處理的系統中,系統的核心由消息、計算和存儲三個子系統組成,好比消息系統有 Kafka、RabbitMQ、Flume 等;計算系統有 Spark Streaming、Flink、Heron 等;存儲系統有各類分佈式的文件系統,DB、K/V store 等。 因爲三個部分中,每一個部分都有相應的不一樣產品,三個部分之間也相互分隔和獨立不多關聯,這帶來了一些問題,好比須要更多人力維護,部署複雜,調優難度大,監管難,數據丟失風險大等等。數據結構

爲何要選擇 Apache Pulsar?

面對消息,存儲和計算三個部分分隔的現狀,Apache Pulsar 在這三個方面進行了很好的協調、抽象和統一。 具體到 Apache Pulsar 內部,消息部分由 Pulsar Broker 來負責;存儲部分使用了 Apache BookKeeper,計算部分由 Pulsar Functions 來負責。架構

Apache Pulsar 是 2016 年 yahoo 開源的下一代大規模分佈式消息系統,目前在 Apache 基金會下孵化。在 Yahoo 的生產環境中大規模部署並使用了近 4 年,服務於 Mail、Finance、Sports、 Flickr、 the Gemini Ads platform、 Sherpa 以及 Yahoo 的 KV 存儲等,在 Yahoo 全球 8 個數據中心之間維護了全聯通的複製,幷包含了 200 多萬個 Topics。併發

Apache Pulsar 有幾個明顯區別於其餘消息系統的特色:

  • 優秀的數據持久性和順序性。每一條消息都提供了全局惟一的 ID,多副本,並都是在實時刷盤後再返回給用戶。

  • 統一的消費模型: 支持 Stream(如 Kafka)和 Queue(如 RabbitMQ)兩種消費模型, 支持 exclusive、failover 和 shared 三種消費模式。

  • 靈活的擴展性: 節點擴展的線性和瞬時完成,在擴展中不會有數據的拷貝和遷移。

  • 高吞吐低延遲,在實時刷盤的前提下,依然提供了高帶寬(180 萬 messages/ 秒)和低延遲(5ms at 99%)。

除了這些特性,Apache Pulsar 也具有了優秀的企業級特性,好比多機房互聯互備(Geo-replication),多租戶等。

Apache Pulsar 在架構上最明顯的優點是採用了消息服務和消息存儲分層的策略。它包括了無狀態的消息服務層(broker 節點)和消息存儲層(BookKeeper 中 Bookie 是基本的存儲節點)。這爲系統帶來了極好的擴展性和健壯性。

在消息服務層和存儲層,系統所關注的內容是不同的: 在服務層更多的是對 Producer 和 Consumer 的支持,更關注用戶接口和消息的服務質量,須要更好的 CPU 和網絡帶寬來支持消息的扇入扇出。存儲層更關注磁盤 IOPS 和存儲容量,負責數據的持久化等。

分層的架構帶爲服務和存儲兩層都帶來了線性、瞬時的擴展性。若是須要增長和支持更多的 Producer 和 Consumer,只用對 broker 進行 Scale。若是存儲空間緊張,或者想要消息的時間保持的時間更長,能夠單獨增長存儲節點 Bookie。

在服務層中,broker 不會有相關的數據被持久化保存,是無狀態的。對 Topic 的服務能夠很容易地遷移。若是 broker 失效,能夠很容易地將 topic 遷移到健康的 broker。

在存儲層(Bookie)也是同樣。每一個 topic 的數據被打散並均勻 partition 到多個 segment,每一個 segment 的數據又被分散存儲在 Bookie 集羣中。當想增長容量的時候,只須要添加新的 Bookie,數據會優先選擇剛加入的 Bookie。

一樣當 broker 被 overloaded,添加新的 broker 以後,負載會被均衡地分配到新添加的 broker 之上。

介紹完 Apache Pulsar 的整體架構和特性,下面會從消息、存儲和計算三個方面分別介紹 Apache Pulsar 的設計理念,各層內部以及各層之間的協調、抽象和統一。

Apache Pulsar 的消息層

Apache Pulsar 面向用戶的也是最簡單的三個概念: 主題 Topic、生產者 Producer 和消費者 Consumer。 Topic 是消息的一個通道和載體; Producer 產生數據並向 Topic 這個通道中發送數據; Consumer 從 Topic 中獲取並消費數據。

在 Apache Pulsar 中提供了對 Namespace 的支持。Namespace 是 ApachePulsar 的多租戶機制中重要的組成部分。在一個 Topic 的名字中,包含了:租戶 (Tenant) ,命名空間(namespace)和 Topic 名字,這樣就能夠對全部的 topic 提供層級化的管理。

Tenant 表明系統裏的租戶。假設有一個 Pulsar 集羣被多個組織共享,集羣裏的每一個 Tenant 能夠表明一個組織的團隊、一個核心的功能或一個產品線。一個 Tenant 能夠包含多個 namespace,一個 namespace 能夠包含多個主題。

Tenant 是資源的隔離的單位。namespace 是資源使用和權限設置的單位,咱們能夠設置權限、調整複製選項、管理跨集羣的數據複製、控制消息的過時時間等。namespace 下的 Topic 會繼承 namespace 的配置。若是用戶獲取了 namespace 的寫入權限就能夠往 namespace 寫入數據,若是要寫入的 topic 不存在,就會建立該 topic。

爲了支持異地多備,namespace 又分爲兩種,一種是本地的,只在集羣內可見;一種是全局的,對多個集羣可見。能夠在不一樣的數據中心之間進行數據的交互和互備。

Apache Pulsar 的每一個 namespace 能夠包含多個 topic,而每一個 topic 能夠有多個生產者和訂閱者。每一個訂閱者能夠接受 topic 的全部的消息。爲了給應用程序提供更大的靈活性,Apache Pulsar 經過增長一層 subscription 的抽象,提供了統一的消費模式。 消息的傳遞路徑是 producer-topic-subscription-consumer。subscription 相似 Kafka 中 consumer group 的概念。

Apache Pulsar 支持 exclusive、failover 和 shared 三種訂閱類型,它們能夠共存在同一個 topic 上。數據雖然只寫了一次,可是能夠經過三種的消費方式被屢次消費。

前兩種 exclusive 和 failover,都是 Streaming 的模型,只有一個 consumer 來消費一個 topic partition 中的全部數據,都能保證嚴格的順序。Kafka 和 Kinesis 也是這種消費模型(一個 consumer 消費一個 partition)。

Exclusive 是隻能有一個 consumer 來消費一個 topic 中的數據,不容許其餘的 consumer 加入;failover 是容許多個 consumer 和一個 subscription 關聯,當 master consumer 失效後,能夠有另外的 consumer 來接管成爲新的 master。

第三種是 shared 的消費模式,它屬於 Queue 的模式,常見的 RabbitMQ、ActiveMQ 均屬於這種模式。若是三個 consumer 共同訂閱同一個 subscription,每一個 consumer 大概會消費這個 topic 中的三分之一的數據,若是想ß增長消費的帶寬,只用單獨增長 consumer 的數量而不須要改變 topic 和 partition,很是實用於一些 consumer 處理複雜度比較高的場景,好比視頻,圖片處理等。

除了這三種消費模式,Apache Pulsar 還提供了 reader 的 API 來讀取消息,讓用戶能夠更加靈活的控制和消費消息。

Apache Pulsar 提供了兩種 ack 的機制: 累積(cumulative)模式和單條(individual)模式。

Ack 機制在在消息系統中是很是重要的。消息系統中的 broker 和 consumer 可能會出錯或宕機,當有錯誤發生的時候,若是可以獲取上次消費者消費的位置,而後從這個消費的位置再接着消費,這是很是有用的,這樣能夠避免丟失數據,避免把全部的處理過的數據再處理一遍。

通常經過 message acknowledgement、committing offset 來標記消息的消費狀況。

Kafka 中經過 offset 來簡單的管理 ack,記錄一個 partition 的消費位置。

Pulsar 經過維護一個專門的數據結構 ManagedCursor 來管理 ack 的信息,每次 ack 的改變都會被持久化到硬盤中。

對於 cumulative 的 ack,在標記的消息以前,全部的數據都被消費過了;遇到出錯的狀況會從標記的位置再開始消費。

對於 individual 的消費模式,會單獨標記已經被消費過的消息;遇到出錯的狀況,全部的未被標記 ack 的消息都會被從新發送。Individual 的 ack 模式主要支持 share 的消費模式。它是頗有必要的,由於對通常的 share 的消費模式,都是單個的消息消費處理比較慢,因此才增長 consumer。單獨的標記,能在出錯的時候減小沒必要要的昂貴的處理。

消息的 retention 策略,管理着消息何時被刪除。 其餘的系統大可能是經過時間來控制。有可能時間到了,但消息沒有被消費,也被刪除了。

Apache Pulsar 中,提供了比較全面的 retention 策略。通常狀況下,藉助 ack 的信息,當全部 subscription 都消費了消息以後,消息纔會刪除。數據還能夠額外的設置 retention period,即便都消費了也能再將消息保存一段時間。另外也支持 TTL 的模式。

對於留在 backlog 中的消息,Apache Pulsar 也提供了多種策略,包括 producer-request-hold、producer-exception、consumer-backlog-eviction 等。在 backlog 的 quota 達到時,供用戶選擇怎麼處理新的消息和在 backlog 中的消息。

Apache Pulsar 的存儲層

接下來咱們來看一下 Apache Pulsar 的存儲層,也就是 Apache BookKeeper。Apache BookKeeper 在 2011 年開源,並隨後加入 Apache,成爲 Apache 的頂級項目。BookKeeper 是分佈式的是一個可擴展的、高可用、低延遲的專門爲實時系統優化過的存儲系統。更多系統能夠參考 BookKeeper 的網站 https://bookkeeper.apache.org/ 和 github:https://github.com/apache/bookkeeper。

Apache BookKeeper 爲 Pulsar 系統提供了一個以 Segment(BookKeeper ledger)爲存儲單元的存儲服務。BookKeeper 的存儲節點稱做一個 Bookie。

  • BookKeeper 爲 append-only 的寫入模式提供了優化,經過獨特的設計提供了高帶寬和低延遲。

  • BookKeeper 提供了強一致性和順序性。經過實時刷盤和多備份保證數據的持久性。順序性經過記錄自己攜帶的全局惟一順序 ID 來保證的。這樣對不少對順序要求比較高的應用場景。

  • 高可用是說數據會同時寫入多個 bookie 上,若是 bookie 發生錯誤,即便只有一臺包含數據的 bookie 可用,仍能爲應用提供服務,在其餘 bookie 恢復或有新的 bookie 加入後,會自動檢查並補全所須要的數據備份。

  • IO 隔離,對於 Bookie 的讀和寫是分別發生在不一樣的磁盤上的。這樣不依賴於文件系統和 pagecache 的設計,能保證即便有大量的讀的同時,也能保證寫的高帶寬和低延遲;在大量的寫入的同時,讀請求的服務質量也能獲得保證。這也是能保證多租戶的一個關鍵。

一個 BookKeeper 的集羣由多個 Bookie 節點構成。每一個 Bookie 負責具體的數據存儲。當用戶的 application 要使用 bk 的時候,會設定三個參數,ensemble size(用戶要使用幾臺 bookie)、write quorum(寫入的數據要保留幾個備份)和 ack quorum(每次的寫入操做,有幾個成功後就返回)。Bookie 採用 quorum-vote 的模式,當寫一條數據時,數據同時併發的寫到全部的 write quorum 的 bookie 中,當指定的 ack quorum 返回後,bookie 認爲寫成功,返回。

當 ensemble 中有 bookie 出錯,會從 cluster 中尋找其餘可用的 bookie,進行替換。而後後臺有 autorecovery 作數據的自動恢復,對用戶透明。

BookKeeper 的一個特性是存儲是以 Segment(在 BookKeeper 內部被稱做 ledger)爲存儲的基本單元。每一個 Segment 甚至到每一個消息的粒度,都會被均勻分散到 BookKeeper 的集羣中。保證了數據和服務在多個 Bookie 上的均勻性。經過這張圖,咱們經過簡單對比 Pulsar 和 Kafka 中的 partition 的存儲過程,對 Pulsar 有一個更好的理解。

Pulsar 和 Kafka 都是基於 partition 的邏輯概念來作作 topic 的存儲。最根本的不一樣是,Kafka 的物理存儲也是以 partition 爲單位的,每一個 partition 必須做爲一個總體(一個目錄)被存儲在某一個 broker 上。 而 Pulsar 的每一個 partition 是以 segment 做爲物理存儲的單位,Pulsar 中的每一個 partition 會再被打散並均勻分散到多個 bookie 節點中。

這樣的一個直接的影響是,Kafka 的 partition 的大小,受制於單臺 broker 的存儲;而 Pulsar 的一個 partition 則能夠利用整個集羣的存儲容量。

當 partition 的容量上限達到後,須要擴容的時候,若是現有的單臺機器不能知足,Kafka 可能須要添加新的存儲節點,將 partition 的數據搬移到更大的節點上。可是 Pulsar 只用添加新的 Bookie 存儲節點,新加入的節點因爲剩餘的空間大,會被優先使用,更多的接收新的數據;並且其中不會涉及到任何的老的數據的拷貝和搬移。

Pulsar 在單個節點失敗時也會體現一樣的優點。若是 Pulsar 的服務節點 broker 失效,因爲 broker 是無狀態的,其餘的 broker 能夠很快的接管 topic,不會涉及 topic 數據的拷貝;若是存儲節點 Bookie 失效,集羣中其餘的 Bookie 會從多個 Bookie 節點中併發讀取數據,並對失效節點的數據自動進行數據的恢復,不會對前端的服務有影響。

Apache BookKeeper 內部除了基礎的的 Segment(ledger), 還提供了 Stream 和 Table 兩種服務。 Segment 能夠簡單理解爲一段複製日誌。Stream 服務是經過必定的方式,將一組 Segment 按照順序共同管理起來,這樣就能夠組成一個源源不斷的流。進而,若是咱們用 Stream 來做爲一個 Table 的 change log,實現了一個簡單的 K/V Store,也就是這裏說的 Table 的服務。在實時處理的過程當中,好比 Pulsar Functions 的處理過程當中,須要使用 K/V 的 Table 來存取計算的中間狀態。

經過在 BookKeeper 內部提供 Stream 和 Table 兩種服務,能夠很方便的知足在實時數據處理中的絕大部分的存儲需求。

Apache Pulsar 的計算層

介紹完 Pulsar 中的消息和存儲,下面咱們來了解一下 Pulsar 中的計算部分 – Pulsar Functions。介紹一下 Pulsar Functions 的設計和實現。看看 Pulsar Functions 和其餘的計算引擎不一樣的地方。

首先咱們看一個計算引擎最本質的是要解決什麼問題。 首先用戶定了了一個計算的需求,也就是處理過程: f(x),一組輸入數據經過 f(x)的計算,獲得一組輸出的結果。

基於本質問題,計算引擎通過了長期的發展。第一代的計算引擎,以 Storm 爲表明的經過一個有向無環圖(DAG)來完成一組計算,一般須要大量的代碼編寫工做。如今大部分的計算引擎都提供第二代的 API,即經過 DSL 的方式。第二代的 API 相比第一代更加的緊湊和方便,可是仍是有些複雜,好比包含着大量的 map、flatmap 等。

咱們發現,在實時數據的處理中,有大部分(60%——80%)的計算過程,本質上都是一些很簡單的數據轉換,好比 ETL/Reactive Services/Classification/Real-time Aggregation/Event Routing/Microservices 等等。

另外,雲的興起,帶動了 serverless 的出現和興盛,Serverless 爲咱們提供了一個很好的思路。serverless 提供的是 function 的 API,每個事件觸發一次 function,多個 function 能夠經過組合的方式,完成比較複雜的邏輯。

基於這些緣由,咱們決定設計基於 Serverless 的,由消息來驅動的「Stream-native」的 Pulsar Functions。Pulsar Function 的一個特色是簡單:給用戶的接口簡單;每一個 Function 的實現也十分容易理解;提供多語言的接口(目前支持 Java 和 Python)。

另外一個特色是 Stream-native: Pulsar Functions 的輸入,輸出和中間的 log 都以 Topic 和消息爲中心。

Pulsar Functions 提供兩種 API,第一種是 SDK less 的 API,用戶不用依賴 Pulsar 的 sdk,只用實現 java.util.function.Function 的接口。第二種藉助 Pulsar SDK 的 API,經過 Context 來和 Pulsar 交互和定製。

和 Pulsar 的管理同樣,Pulsar Functions 也提供命令行和 Rest 兩種方式。執行的參數包括輸入的 topic,輸出的 topic 和要執行的 Function 的名字。

咱們能夠舉例說明一下 Pulsar Functions 適用的典型應用場景。

在邊緣計算(Edge Computing)中,傳感器會產生大量數據,並且數據會在邊緣的本地節點上進行不少簡單的處理,好比 Simple filtering, threshold detection, regex matching 等,另外邊緣節點的計算資源有限。 Pulsar Functions 對這樣的場景十分匹配。另外是在機器學習中。最開始的基礎模型經過離線進行計算和訓練。當訓練完,上線後,每個輸入,都會匹配和應用模型,並對模型進行調整。這十分匹配 Pulsar Functions 的消息驅動的模式。另外模型自己也可使用 BookKeeper 作存儲,簡化系統的部署。

這裏 Pulsar Functions 的特性作一個總結。

首先,Pulsar Function 能夠簡單運行在 Pulsar 的 broker 裏面,簡化系統的部署。輸入的 Topic 中的每個消息都會觸發對 Function 的執行。能夠支持多個 Topic 做爲輸入。用戶能夠控制 Function 執行的各類語義:AtMostOnce 是當 Function 收到消息後就進行 ACK;AtLeastOnce 是在 Function 對消息處理完成後才進行 ACK;ExactlyOnce 是經過 Pulsar 內部實現的 deDup 的策略來實現。 Pulsar Functions 可使用 BookKeeper 提供的 Stream 服務來作 Topic 的存儲,使用提供的 Table 服務來作中間狀態的存儲,實現存儲的統一,不須要部署其餘的系統。這爲系統的開發、測試、集成和運維帶來了更多的便利。

經過介紹 Pulsar 的消息,存儲和計算三個部分,但願能讓你們對 Pulsar 有更進一步的瞭解。在 Pulsar 的消息系統中,提供了基於 Stream 和 Queue 的統一的消費模式,提供了無狀態的 Broker 來提高系統的擴展性和容錯性。在存儲系統 BookKeeper 中,提供了對 Stream 的存儲和對 K/V Table 的存儲的統一,知足了實時處理系統中對 topic 和狀態的存儲需求。 在計算部分,Pulsar Functions 中基於消息驅動(stream-native),能夠計算和消息一種統一。

另外對於 Pulsar 系統和外部系統的互聯(connector),能夠看做是一種特殊的 Pulsar Functions。

Pulsar 及 Kafka 基準測試對比

這裏的 Benchmark(https://github.com/openmessaging/openmessaging-benchmark)是咱們和阿里一塊兒起草的 openMessaging 項目的一部分。若是有時間和機器,歡迎你們本身驗證一下。

這個 Benchmark 經過相同的配置,對 Apache Pulsar 和 Kafka 的帶寬和延遲進行了簡單的測試。


最大吞吐量測試

這個結果是分別測試了 Pulsar 和 Kafka 在通常模式和 Exactly-once 模式下的 Publish 帶寬。

在 1KB 消息大小下,Pulsar 的通常模式和 Exactly-once 模式下的帶寬都在 21 萬條 / 秒左右;Kafka 在通常模式和 Exactly-once 模式下的帶寬分別是 7 萬多條 / 秒和 5 萬多條 / 秒。

除了帶寬數值的區別,另外一方面是對 ExactlyOnce 的處理,Pulsar 經過自身的機制,幾乎相對於通常的 模式在性能上沒有區別。可是 Kafka 的兩種模式會有較大的差異。

時延測試

這個結果是 Pulsar 和 Kafka 在固定的 Public 帶寬(50K/ 秒)下,各個百分位消息的發佈時延。能夠看出 Kafka 在不到 99% 的百分位,時延就開始大幅上升,可是 Pulsar 在 99.9% 的百分位之後,時延纔開始上升。


這個結果是從時間軸的角度來看 Pulsar 和 Kafka 的時延。先不關注時延的絕對數值,直觀的感受是 Pulsar 的時延更加穩定;Kafka 的時延會有很大的波動。 這和 Pulsar 中的內存和對 GC 的優化有直接的關係。Apache Pulsar 是一個新興的下一代的消息系統,因爲 Pulsar Functions 的加入,和底層 Apache BookKeeper 提供的 Table 服務的完善,如今能夠認爲 Apache Pulsar 是一個在消息、存儲和計算三方面的統一的實時數據處理平臺。

Apache Pulsar 有不少先進的理念、設計和抽象在裏面。因爲時間關係有不少的部分沒能展開細講。

Apache Pulsar 和 Apache BookKeeper 中也有愈來愈多的有意思的 feature 和功能正在進行,公司和社區也都期待你們的關注和加入。若是你們有更多的關於 Meetup 和 POC 等需求,或者在使用其餘消息系統中遇到問題,能夠經過 Slack Channel 和微信聯繫咱們。

做者介紹

翟佳,Streamlio核心創始成員之一,畢業於中科院計算所,目前就任於一家下一代實時處理初創公司 Streamlio,是 Streamlio的核心創始成員之一。在此以前任職於 EMC,是北京 EMC實時處理平臺的技術負責人。主要從事實時計算和分佈式存儲系統的相關開發,是開源項目 Apache BookKeeper PMC Member和 Committer,也在 Apache Pulsar, Distributedlog等項目中持續貢獻代碼。

相關文章
相關標籤/搜索