Apache Pulsar 是 Yahoo 開源的下一代分佈式消息系統,在2018年9月從 Apache 軟件基金會畢業成爲頂級項目。Pulsar 特有的分層分片的架構,在保證大數據消息流系統的性能和吞吐量的同時,也提供了高可用性、高可擴展性和易維護性。算法
分片架構將消息流數據的存儲粒度從分區拉低到了分片,以及相應的層級化存儲,使 Pulsar 成爲 unbounded streaming data storage 的不二之選。這使得 Pulsar 能夠更完美地匹配和適配 Flink 的批流一體的計算模式。數據庫
隨着開源後,各行業企業能夠根據不一樣需求,爲 Pulsar 賦予更豐富的功能,因此目前它也再也不只是中間件的功能,而是慢慢發展成爲一個 Event Streaming Platform(事件流處理平臺),具備 Connect(鏈接)、Store(存儲)和 Process(處理)功能。json
在鏈接方面,Pulsar 具備本身單獨的 Pub/Sub 模型,能夠同時知足 Kafka 和 RocketMQ 的應用場景。同時 Pulsar IO 的功能,其實就是 Connector,能夠很是方便地將數據源導入到 Pulsar 或從 Pulsar 導出等。緩存
另外,在Pulsar 2.5.0 中,咱們新增了一個重要機制:Protocol handler。這個機制支持在 broker 自定義添加額外的協議支持,能夠保證在不更改原數據庫的基礎上,也能享用 Pulsar 的一些高級功能。因此 Pulsar 也延展出好比:KoP、ActiveMQ、Rest 等。網絡
Pulsar 提供了可讓用戶導入的途徑後就必然須要考慮在 Pulsar 上進行存儲。Pulsar 採用的是分佈式存儲,最開始是在 Apache BookKeeper 上進行。後來添加了更多的層級存儲,經過 JCloud 和 HDFS 等多種模式進行存儲的選擇。固然,層級存儲也受限於存儲容量。多線程
Pulsar 提供了一個無限存儲的抽象,方便第三方平臺進行更好的批流融合的計算。即 Pulsar 的數據處理能力。Pulsar 的數據處理能力其實是按照你數據計算的難易程度、實效性等進行了切分。架構
目前 Pulsar 包含如下幾類集成融合處理方式:併發
從使用來看,Pulsar 的用法與傳統的消息系統相似,是基於發佈-訂閱模型的。使用者被分爲生產者(Producer)和消費者(Consumer)兩個角色,對於更具體的需求,還能夠以 Reader 的角色來消費數據。用戶能夠以生產者的身份將數據發佈在特定的主題之下,也能夠以消費者的身份訂閱(Subscription)特定的主題,從而獲取數據。在這個過程當中,Pulsar 實現了數據的持久化與數據分發,Pulsar 還提供了Schema 功能,可以對數據進行驗證。分佈式
以下圖所示,Pulsar 裏面有幾種訂閱模式:函數
Pulsar 裏的主題分紅兩類,一類是分區主題(Partitioned Topic),一類是非分區主題(Not Partitioned Topic)。
分區主題其實是由多個非分區主題組成的。主題和分區都是邏輯上的概念,咱們能夠把主題看做是一個大的無限的事件流,被分區切分紅幾條小的無限事件流。
而對應的,在物理上,Pulsar 採用分層結構。每一條事件流存儲在一個 Segment 中,每一個Segment 包括了許多個Entry,Entry 裏面存放的纔是用戶發送過來的一條或多條消息實體。
Message 是 Entry 中存放的數據,也是 Pulsar 中消費者消費一次得到的數據。Message 中除了包括字節流數據,還有 Key 屬性,兩種時間屬性和 MessageId 以及其餘信息。MessageId 是消息的惟一標識,包括了ledger-id、entry-id、 batch-index、 partition-index 的信息,以下圖,分別記錄了消息在Pulsar 中的Segment、Entry、Message、Partition 存儲位置, 所以也能夠據此從物理上找到Message的信息內容。
一個 Pulsar 集羣由 Brokers 集羣和 Bookies 集羣組成。Brokers 之間是相互獨立的,負責向生產者和消費者提供關於某個主題的服務。Bookies 之間也是相互獨立的,負責存儲 Segment 的數據,是消息持久化的地方。爲了管理配置信息和代理信息,Pulsar 還藉助了 Zookeeper 這個組件,Brokers 和 Bookies 都會在 zookeeper 上註冊,下面從消息的具體讀寫路徑(見下圖)來介紹 Pulsar 的結構。
在寫路徑中,生產者建立併發送一條消息到主題中,該消息可能會以某種算法(好比Round robin)被路由到一個具體的分區上,Pulsar 會選擇一個Broker 爲這個分區服務,該分區的消息實際會被髮送到這個 Broker上。當Broker 拿到一條消息,它會以 Write Quorum (Qw)的方式將消息寫入到 Bookies 中。當成功寫入到 Bookies 的數量達到設定時,Broker 會收到完成通知,而且 Broker 也會返回通知生產者寫入成功。
在讀路徑中,消費者首先要發起一次訂閱,以後才能與主題對應的 Broker 進行鏈接,Broker 從 Bookies 請求數據併發送給消費者。當數據接受成功,消費者能夠選擇向 Broker 發送確認信息,使得 Broker 可以更新消費者的訪問位置信息。前面也提到,對於剛寫入的數據,Pulsar 會存儲在緩存中,那麼就能夠直接從 Brokers 的緩存中讀取了,縮短了讀取路徑。
Pulsar 將存儲與服務相分離,實現了很好的可拓展性,在平臺層面,可以經過調整Bookies 的數量來知足不一樣的需求。在用戶層面,只須要跟 Brokers 通訊,而Brokers 自己被設計成沒有狀態的,當某個 Broker 因故障沒法使用時,能夠動態的生成一個新的 Broker 來替換。
首先,Pulsar Connector 在使用上是比較簡單的,由一個 Source 和一個 Sink 組成,source 的功能就是將一個或多個主題下的消息傳入到 Flink 的Source中,Sink的功能就是從 Flink 的 Sink 中獲取數據並放入到某些主題下,在使用方式上,以下所示,與 Kafa Connector 很類似,使用時須要設置一些參數。
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); Properties props = new Properties(); props.setProperty("topic", "test-source-topic") FlinkPulsarSource<String> source = new FlinkPulsarSource<>( serviceUrl, adminUrl, new SimpleStringSchema(), props); DataStream<String> stream = see.addSource(source); FlinkPulsarSink<Person> sink = new FlinkPulsarSink( serviceUrl, adminUrl, Optional.of(topic), // mandatory target topic props, TopicKeyExtractor.NULL, // replace this to extract key or topic for each record Person.class); stream.addSink(sink);
如今介紹 Kulsar Connector 一些特性的實現機制。
由於 Pulsar 中的 MessageId 是全局惟一且有序的,與消息在 Pulsar 中的物理存儲也對應,所以爲了實現 Exactly Once,Pulsar Connector 藉助 Flink 的 Checkpoint 機制,將 MessageId 存儲到 Checkpoint。
對於鏈接器的 Source 任務,在每次觸發 Checkpoint 的時候,會將各個分區當前處理的 MessageId 保存到狀態存儲裏面,這樣在任務重啓的時候,每一個分區均可以經過 Pulsar 提供的 Reader seek 接口找到 MessageId 對應的消息位置,而後從這個位置以後讀取消息數據。
經過 Checkpoint 機制,還可以向存儲數據的節點發送數據使用完畢的通知,從而能準確刪除過時的數據,作到存儲的合理利用。
考慮到Flink中的任務都是長時間運行的,在運行任務的過程當中,用戶也許會須要動態的增長部分主題或者分區,Pulsar Connector 提供了自動發現的解決方案。
Pulsar 的策略是另外啓動一個線程,按期的去查詢設定的主題是否改變,分區有沒有增刪,若是發生了新增分區的狀況,那麼就額外建立新的Reader 任務去完成主題下的數據的反序列化,固然若是是刪除分區,也會相應的減小讀取任務。
在讀取主題下的數據的過程當中,咱們能夠將數據轉化成一條條結構化的記錄來處理。Pulsar 支持 Avro schema and avro/json/protobuf Message 格式類型的數據轉化成 Flink 中的 Row格式數據。對於用戶關心的元數據,Pulsar 也在 Row 中提供了對應的元數據域。
另外,Pulsar 基於 Flink 1.9 版本進行了新的開發,支持 Table API 和 Catalog,Pulsar 作了一個簡單的映射,以下圖所示,將 Pulsar 的租戶/命名空間對應到 Catalog 的數據庫,將主題對應爲庫中的具體表。
首先,以前提到 Pulsar 將數據存儲在 Bookeeper 中,還能夠導入到 Hdfs 或者 S3 這樣的文件系統中,但對於分析型應用來講,咱們每每只關心全部數據中每條數據的部分屬性,所以採用列存儲的方式對 IO 和網絡都會有性能提高,Pulsar 也在嘗試在Segment 中以列的方式存儲。
其次,在原來的讀路徑中,不論是 Reader 仍是Comsumer,都須要經過 Brokers 來傳遞數據。若是採用新的 Bypass Broker方式,經過查詢元數據,就能直接找到每條 Message 存儲的 Bookie 位置,這樣能夠直接從 Bookie 讀取數據,縮短讀取路徑,從而提高效率。
最後,Pulsar 相對 Kafka 來講,因爲數據在物理上是存放在一個個 Segment 中的,那麼在讀取的過程當中,經過提升並行化的方式,創建多線程同時讀取多個 Segment,就可以提高整個做業的完成效率,不過這也須要你的任務自身對每一個Topic 分區的訪問順序沒有嚴格要求,而且對於新產生的數據,是不保存在 Segement 的,仍是須要作緩存的訪問來獲取數據,所以,並行讀取將成爲一個可選項,爲用戶提供更多的選擇方案。