Flink 生態:Pulsar Connector 機制剖析

Apache Pulsar 是 Yahoo 開源的下一代分佈式消息系統,在2018年9月從 Apache 軟件基金會畢業成爲頂級項目。Pulsar 特有的分層分片的架構,在保證大數據消息流系統的性能和吞吐量的同時,也提供了高可用性、高可擴展性和易維護性。算法

分片架構將消息流數據的存儲粒度從分區拉低到了分片,以及相應的層級化存儲,使 Pulsar 成爲 unbounded streaming data storage 的不二之選。這使得 Pulsar 能夠更完美地匹配和適配 Flink 的批流一體的計算模式。數據庫

1. Pulsar 簡介

1.1 特色

隨着開源後,各行業企業能夠根據不一樣需求,爲 Pulsar 賦予更豐富的功能,因此目前它也再也不只是中間件的功能,而是慢慢發展成爲一個 Event Streaming Platform(事件流處理平臺),具備 Connect(鏈接)、Store(存儲)和 Process(處理)功能。json

■ Connect

在鏈接方面,Pulsar 具備本身單獨的 Pub/Sub 模型,能夠同時知足 Kafka 和 RocketMQ 的應用場景。同時 Pulsar IO 的功能,其實就是 Connector,能夠很是方便地將數據源導入到 Pulsar 或從 Pulsar 導出等。緩存

另外,在Pulsar 2.5.0 中,咱們新增了一個重要機制:Protocol handler。這個機制支持在 broker 自定義添加額外的協議支持,能夠保證在不更改原數據庫的基礎上,也能享用 Pulsar 的一些高級功能。因此 Pulsar 也延展出好比:KoP、ActiveMQ、Rest 等。網絡

■ Store

Pulsar 提供了可讓用戶導入的途徑後就必然須要考慮在 Pulsar 上進行存儲。Pulsar 採用的是分佈式存儲,最開始是在 Apache BookKeeper 上進行。後來添加了更多的層級存儲,經過 JCloud 和 HDFS 等多種模式進行存儲的選擇。固然,層級存儲也受限於存儲容量。多線程

■ Process

Pulsar 提供了一個無限存儲的抽象,方便第三方平臺進行更好的批流融合的計算。即 Pulsar 的數據處理能力。Pulsar 的數據處理能力其實是按照你數據計算的難易程度、實效性等進行了切分。架構

目前 Pulsar 包含如下幾類集成融合處理方式:併發

  • Pulsar Function:Pulsar 自帶的函數處理,經過不一樣系統端的函數編寫,便可完成計算並運用到 Pulsar 中。
  • Pulsar-Flink connector 和 Pulsar-Spark connector:做爲批流融合計算引擎,Flink 和 Spark 都提供流計算的機制。若是你已經在使用他們了,那恭喜你。由於 Pulsar 也所有支持這兩種計算,無需你再進行多餘的操做了。
  • Presto (Pulsar SQL):有的朋友會在應用場景中更多的使用 SQL,進行交互式查詢等。Pulsar 與 Presto 有很好的集成處理,能夠用 SQL 在 Pulsar 進行處理。

ͼƬ1.png

1.2 訂閱模型

從使用來看,Pulsar 的用法與傳統的消息系統相似,是基於發佈-訂閱模型的。使用者被分爲生產者(Producer)和消費者(Consumer)兩個角色,對於更具體的需求,還能夠以 Reader 的角色來消費數據。用戶能夠以生產者的身份將數據發佈在特定的主題之下,也能夠以消費者的身份訂閱(Subscription)特定的主題,從而獲取數據。在這個過程當中,Pulsar 實現了數據的持久化與數據分發,Pulsar 還提供了Schema 功能,可以對數據進行驗證。分佈式

以下圖所示,Pulsar 裏面有幾種訂閱模式:函數

  1. 獨佔訂閱(Exclusive)
  2. 故障轉移訂閱(Failover)
  3. 共享訂閱(Shared)
  4. Key保序共享訂閱(Key_shared)

ͼƬ2.png

ͼƬ3.png

Pulsar 裏的主題分紅兩類,一類是分區主題(Partitioned Topic),一類是非分區主題(Not Partitioned Topic)。

分區主題其實是由多個非分區主題組成的。主題和分區都是邏輯上的概念,咱們能夠把主題看做是一個大的無限的事件流,被分區切分紅幾條小的無限事件流。

而對應的,在物理上,Pulsar 採用分層結構。每一條事件流存儲在一個 Segment 中,每一個Segment 包括了許多個Entry,Entry 裏面存放的纔是用戶發送過來的一條或多條消息實體。

Message 是 Entry 中存放的數據,也是 Pulsar 中消費者消費一次得到的數據。Message 中除了包括字節流數據,還有 Key 屬性,兩種時間屬性和 MessageId 以及其餘信息。MessageId 是消息的惟一標識,包括了ledger-id、entry-id、 batch-index、 partition-index 的信息,以下圖,分別記錄了消息在Pulsar 中的Segment、Entry、Message、Partition 存儲位置, 所以也能夠據此從物理上找到Message的信息內容。

ͼƬ4.png

2. Pulsar 架構

一個 Pulsar 集羣由 Brokers 集羣和 Bookies 集羣組成。Brokers 之間是相互獨立的,負責向生產者和消費者提供關於某個主題的服務。Bookies 之間也是相互獨立的,負責存儲 Segment 的數據,是消息持久化的地方。爲了管理配置信息和代理信息,Pulsar 還藉助了 Zookeeper 這個組件,Brokers 和 Bookies 都會在 zookeeper 上註冊,下面從消息的具體讀寫路徑(見下圖)來介紹 Pulsar 的結構。

ͼƬ5.png

ͼƬ6.png

在寫路徑中,生產者建立併發送一條消息到主題中,該消息可能會以某種算法(好比Round robin)被路由到一個具體的分區上,Pulsar 會選擇一個Broker 爲這個分區服務,該分區的消息實際會被髮送到這個 Broker上。當Broker 拿到一條消息,它會以 Write Quorum (Qw)的方式將消息寫入到 Bookies 中。當成功寫入到 Bookies 的數量達到設定時,Broker 會收到完成通知,而且 Broker 也會返回通知生產者寫入成功。

在讀路徑中,消費者首先要發起一次訂閱,以後才能與主題對應的 Broker 進行鏈接,Broker 從 Bookies 請求數據併發送給消費者。當數據接受成功,消費者能夠選擇向 Broker 發送確認信息,使得 Broker 可以更新消費者的訪問位置信息。前面也提到,對於剛寫入的數據,Pulsar 會存儲在緩存中,那麼就能夠直接從 Brokers 的緩存中讀取了,縮短了讀取路徑。

Pulsar 將存儲與服務相分離,實現了很好的可拓展性,在平臺層面,可以經過調整Bookies 的數量來知足不一樣的需求。在用戶層面,只須要跟 Brokers 通訊,而Brokers 自己被設計成沒有狀態的,當某個 Broker 因故障沒法使用時,能夠動態的生成一個新的 Broker 來替換。

3. Pulsar Connector 內部機制

首先,Pulsar Connector 在使用上是比較簡單的,由一個 Source 和一個 Sink 組成,source 的功能就是將一個或多個主題下的消息傳入到 Flink 的Source中,Sink的功能就是從 Flink 的 Sink 中獲取數據並放入到某些主題下,在使用方式上,以下所示,與 Kafa Connector 很類似,使用時須要設置一些參數。

StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); Properties props = new Properties(); 
props.setProperty("topic", "test-source-topic") FlinkPulsarSource<String> source = new FlinkPulsarSource<>(
              serviceUrl, 
              adminUrl, 
              new SimpleStringSchema(), 
              props); 
DataStream<String> stream = see.addSource(source);

FlinkPulsarSink<Person> sink = 
      new FlinkPulsarSink(
              serviceUrl, 
              adminUrl, 
              Optional.of(topic), // mandatory target topic 
              props, 
              TopicKeyExtractor.NULL, // replace this to extract key or topic for each record
              Person.class); 
stream.addSink(sink);

如今介紹 Kulsar Connector 一些特性的實現機制。

3.1 精確一次

由於 Pulsar 中的 MessageId 是全局惟一且有序的,與消息在 Pulsar 中的物理存儲也對應,所以爲了實現 Exactly Once,Pulsar Connector 藉助 Flink 的 Checkpoint 機制,將 MessageId 存儲到 Checkpoint。

對於鏈接器的 Source 任務,在每次觸發 Checkpoint 的時候,會將各個分區當前處理的 MessageId 保存到狀態存儲裏面,這樣在任務重啓的時候,每一個分區均可以經過 Pulsar 提供的 Reader seek 接口找到 MessageId 對應的消息位置,而後從這個位置以後讀取消息數據。

經過 Checkpoint 機制,還可以向存儲數據的節點發送數據使用完畢的通知,從而能準確刪除過時的數據,作到存儲的合理利用。

3.2 動態發現

考慮到Flink中的任務都是長時間運行的,在運行任務的過程當中,用戶也許會須要動態的增長部分主題或者分區,Pulsar Connector 提供了自動發現的解決方案。

Pulsar 的策略是另外啓動一個線程,按期的去查詢設定的主題是否改變,分區有沒有增刪,若是發生了新增分區的狀況,那麼就額外建立新的Reader 任務去完成主題下的數據的反序列化,固然若是是刪除分區,也會相應的減小讀取任務。

3.3 結構化數據

在讀取主題下的數據的過程當中,咱們能夠將數據轉化成一條條結構化的記錄來處理。Pulsar 支持 Avro schema and avro/json/protobuf Message 格式類型的數據轉化成 Flink 中的 Row格式數據。對於用戶關心的元數據,Pulsar 也在 Row 中提供了對應的元數據域。

另外,Pulsar 基於 Flink 1.9 版本進行了新的開發,支持 Table API 和 Catalog,Pulsar 作了一個簡單的映射,以下圖所示,將 Pulsar 的租戶/命名空間對應到 Catalog 的數據庫,將主題對應爲庫中的具體表。

ͼƬ7.png

4. 將來規劃

首先,以前提到 Pulsar 將數據存儲在 Bookeeper 中,還能夠導入到 Hdfs 或者 S3 這樣的文件系統中,但對於分析型應用來講,咱們每每只關心全部數據中每條數據的部分屬性,所以採用列存儲的方式對 IO 和網絡都會有性能提高,Pulsar 也在嘗試在Segment 中以列的方式存儲。

其次,在原來的讀路徑中,不論是 Reader 仍是Comsumer,都須要經過 Brokers 來傳遞數據。若是採用新的 Bypass Broker方式,經過查詢元數據,就能直接找到每條 Message 存儲的 Bookie 位置,這樣能夠直接從 Bookie 讀取數據,縮短讀取路徑,從而提高效率。

最後,Pulsar 相對 Kafka 來講,因爲數據在物理上是存放在一個個 Segment 中的,那麼在讀取的過程當中,經過提升並行化的方式,創建多線程同時讀取多個 Segment,就可以提高整個做業的完成效率,不過這也須要你的任務自身對每一個Topic 分區的訪問順序沒有嚴格要求,而且對於新產生的數據,是不保存在 Segement 的,仍是須要作緩存的訪問來獲取數據,所以,並行讀取將成爲一個可選項,爲用戶提供更多的選擇方案。

相關文章
相關標籤/搜索