Kafka 在華泰證券的探索與實踐

引言算法

Apache Kafka 發源於 LinkedIn,於 2011 年成爲 Apache 的孵化項目,隨後於 2012 年成爲 Apache 的頂級項目之一。按照官方定義,Kafka 是一個分佈式流平臺,具有流數據的發佈及訂閱(與消息隊列或企業級消息系統相似)能力、容錯方式的流數據存儲能力以及流數據的實時處理能力。Kafka 的優點在於:apache

 

  • 可靠性:具備分區機制、副本機制和容錯機制的分佈式消息系統。api

  • 可擴展性:消息系統支持集羣規模的熱擴展。緩存

  • 高性能:在數據發佈和訂閱過程當中都能保證數據的高吞吐量。即使在 TB 級數據存儲的狀況下,仍然能保證穩定的性能。安全


目前,Kafka 在互聯網、金融、傳統行業等各類類型公司內部普遍使用,已成爲全球範圍內實時數據傳輸和處理領域的事實標準。服務器

 

基本原理及概念網絡

一個典型的 Kafka 集羣中包含:(1)若干 Producer,用於生產數據;(2)若干 Broker,構成集羣吞吐數據;(3)若干 Consumer 消費數據;(4)一個 Zookeeper 集羣,進行全局控制和管理。Kafka 的拓撲結構以下圖所示:架構

 

圖1 kafka 架構圖運維

Kafka 經過 Zookeeper 管理集羣配置、選舉 leader,以及在 Consumer Group 發生變化時進行再平衡(rebalance)。Producer 使用 push 模式將消息發佈到 broker,Consumer 使用 pull 模式從 broker 訂閱並消費消息並更新消費的偏移量值(offset)。異步

基本概念:

  • Broker(代理):Kafka 集羣的服務器節點稱爲 broker。

  • Topic(主題):在 Kafka 中,使用一個類別屬性來劃分數據的所屬類,劃分數據的這個類稱爲 topic。一個主題能夠有零個、一個或多個消費者去訂閱寫到這個主題裏面的數據。

  • Partition(分區):主題中的數據分割爲一個或多個 partition,分區是一個有序、不變序列的記錄集合,經過不斷追加造成結構化的日誌。

  • Producer(生產者):數據的發佈者,該角色將消息發佈到 Kafka 的 topic 中。生產者負責選擇哪一個記錄分配到指定主題的哪一個分區中。

  • Consumer(消費者):從 broker 中讀取數據,消費者能夠消費多個 topic 中的數據。

  • Consumer Group(消費者組):每一個 consumer 都屬於一個特定的 group 組,一個 group 組能夠包含多個 consumer,但一個組中只會有一個 consumer 消費數據。

     

主題和分區:


Topic 的本質就是一個目錄,由一些 Partition Logs(分區日誌)組成,其組織結構以下圖所示。每一個 Partition 中的消息都是有序的,生產的消息被不斷追加到 Partition log 上,其中的每個消息都被賦予了一個惟一的 offset 值。

 

圖 2 Kafka分區數據存儲示意圖

對於傳統的 message queue 而言,通常會刪除已經被消費的消息,Kafka 集羣會保存全部的消息,無論消息有沒有被消費。Kafka 提供兩種策略刪除舊數據:(1)基於時間;(2)基於 Partition 文件大小。只有過時的數據纔會被自動清除以釋放磁盤空間。
Kafka 須要維持的元數據只有「已消費消息在 Partition 中的 offset 值」,Consumer 每消費一個消息,offset 就會加 1。其實消息的狀態徹底是由 Consumer 控制的,Consumer 能夠跟蹤和重設這個 offset 值,這樣 Consumer 就能夠讀取任意位置的消息。

數據備份機制:

 

Kafka 容許用戶爲每一個 topic 設置副本數量,副本數量決定了有幾個 broker 來存放寫入的數據。若是你的副本數量設置爲 3,那麼一份數據就會被存放在 3 臺不一樣的機器上,那麼就容許有 2 個機器失敗。通常推薦副本數量至少爲 2,這樣就能夠保證增減、重啓機器時不會影響到數據消費。若是對數據持久化有更高的要求,能夠把副本數量設置爲 3 或者更多。

核心api:


Producer API:容許應用去推送一個流記錄到一個或多個 kafka 主題上。


Consumer API:容許應用去訂閱一個或多個主題,並處理流數據。Consumer API 包含 high level API 和 Sample api 兩套。使用 high level API 時,同一 Topic 的一條消息只能被同一個 Consumer Group 內的一個 Consumer 消費,但多個 Consumer Group 可同時消費這一消息。與之相對的 Sampleapi 是一個底層的 API,徹底無狀態的,每次請求都須要指定 offset 值。


Streams API:容許應用做爲一個流處理器,消費來自一個或多個主題的輸入流,或生產一個輸出流到一個或多個輸出主題,並能夠有效地將輸入流轉換爲輸出流。
其它 Kafka 的特性將在下面華泰證券的使用示例中進一步介紹。

Kafka在華泰證券背景介紹及建設現狀

長期以來,華泰證券的系統建設依賴於服務廠商,廠商之間技術方案的差別性形成了系統之間的異構化,各類類型的系統架構長期存在,在消息中間件領域尤是如此。如短信平臺使用 IBMMQ,CRM 系統使用 ESB 架構,自營業務使用 Oracletuxedo 架構,櫃檯系統使用恆生 MessageCenter 架構等。隨着華泰證券自主研發的大規模投入,迫切須要改變這種煙囪式的系統建設方式,以統一化的服務化平臺架構來建設系統。


2015 年,咱們經過對 Kafka、ActiveMQ 及 RabbitMQ 等開源消息中間件進行全面的測試對比,最終從性能及高可用方面考慮,選擇 Kafka 做爲了公司級消息中間件,通過兩年多的探索和實踐,Kafka 平臺已承接大量重要生產業務系統,支撐了全公司業務的高速發展,積累了大量的生產實踐經驗。


通過將近三年的建設發展,目前在華泰證券內部已分別建設 0.9.0 和 0.10.1 版本的 Kafka 集羣,整體集羣數量 20 餘臺。


目前華泰內部 kafka 已爲行情計算、交易回報、量化分析等核心系統提供穩定服務,同時涵蓋了日誌、數據分析等諸多運維領域的應用,日均消息吞吐量達 2.3TB,峯值流量超 4.8Gb/s,TOPIC 數量 190 餘個,服務 30 個以上應用系統。

實踐經驗

(1)高可用雙活架構
如圖 3 所示,Kafka 高可用特性依賴於 zookeeper 來實現,因爲 zookeeper 的 paxos 算法特性,故 zookeeper 採用同城三中心部署方式,保證 zookeeper 自己高可用,一般其中兩個數據中心部署偶數臺機器,另外一數據中心部署單臺機器。


Kafkabroker 跨數據中心兩節點部署,全部 topic 的 partition 保證在兩中心都有副本。若是單數據中心出現問題,另外一箇中心能自動進行接管,業務系統能夠無感知切換。


因爲Kafka的高帶寬需求,主機採用萬兆網卡,而且在網卡作 bond0 以保證網卡高可用,跨數據中心之間的網絡通訊採用獨立的萬兆波分通道。

 

圖 3 KAFKA 平臺部署架構圖

(2)參數調優
• 首先咱們在 JVM 層面作了不少嘗試。對 Kafka 服務啓動參數進行調優,使用 G1 回收器。kafka 內存配置通常選擇 64G,其中 16G 給 Kafka 應用自己,剩餘內存所有用於操做系統自己的 page cache.


• 此外爲了保證核心系統的達到最佳的讀寫效果,咱們採用 SSD 硬盤,並作了 raid5 冗餘,來保證硬盤的高效 IO 讀寫能力。


• 其次咱們經過調整 broker 的 num.io.threads,num.network.threads, num.replica.fetchers 等參數來保證集羣之間快速複製和吞吐。


(3)數據一致性保證

Kafka 有本身一套獨特的消息傳輸保障機制(at least once)。當 producer 向 broker 發送消息時,因爲副本機制(replication)的存在,一旦這條消息被 broker 確認,它將不會丟失。但若是 producer 發送數據給 broker 後,遇到網絡問題而形成通訊中斷,那 producer 就沒法判斷該條消息是否已經被確認。這時 producer 能夠重試,確保消息已經被 broker 確認,爲了保證消息的可靠性,咱們要求業務作到:

• 保證發送端成功
當 producer 向 leader 發送數據時,能夠經過 request.required.acks 參數來設置數據可靠性的級別:

1(默認) leader 已成功收到的數據並獲得確認後發送下一條 message。若是 leader 宕機,則會丟失數據。
0 送端無需等待來自 broker 的確認而繼續發送下一批消息。這種狀況下數據傳輸效率最高,可是數據可靠性確是最低的。
-1(ALL) 發送端須要等待 ISR 列表中全部列表都確認接收數據後纔算一次發送完成,可靠性最高。

• 保證消費者消費成功(at least once)

咱們要求消費者關閉自動提交(enable.auto.commit:false),同時當消費者每次 poll 處理完業務邏輯後必須完成手動同步提交(commitSync),若是消費者在消費過程當中發生 crash,下次啓動時依然會從以前的位置開始消費,從而保證每次提交的內容都能被消費。

• 消息去重 

考慮到 producer,broker,consumer 之間都有可能形成消息重複,因此咱們要求接收端須要支持消息去重的功能,最好藉助業務消息自己的冪等性來作。其中有些大數據組件,如 hbase,elasticsearch 自然就支持冪等操做。

 

圖 4Kafka 消息可靠性機制

 

場景事例:行情數據 hbase 存儲
在華泰內部使用 kafka 來緩存一段時間的行情數據,並作相應處理爲了保證 kafka 中數據的完整性,發送端 API 參數配置:

props.put(「acks」, 「all」);

爲了防止某條發送影響後續的消息發送,採用帶異步回調的模式發送

 

在接收端,啓動專門的消費者拉取 kafka 數據存入 hbase。hbase 的 rowkey 的設計主要包括 SecurityId(股票id)和 timestamp(行情數據時間)。消費線程從 kafka 拉取數據後反序列化,而後批量插入 hbase,只有插入成功後才往 kafka 中持久化 offset。這樣的好處是,若是在中間任意一個階段發生報錯,程序恢復後都會從上一次持久化 offset 的位置開始消費數據,而不會形成數據丟失。若是中途有重複消費的數據,則插入 hbase 的 rowkey 是相同的,數據只會覆蓋不會重複,最終達到數據一致。


因此,從根本上說,kafka 上的數據傳輸也是數據最終一致性的典型場景。

 


圖 5hbase 持久化邏輯

(4)ACL安全


目前華泰內部經過配置 allow.everyone.if.no.acl.found 參數(:true)讓 Kafka 集羣同時具有 ACL 和非 ACL 的能力,避免資源的浪費。咱們選用 SASL 做爲 Kafka 鑑權方式,由於 SASL 雖然簡單,但已知足需求,而 Kerberos 使用太重,過分複雜組件會給 Kafka 帶來更多不肯定的因素,如示例所示,根據部門劃分來分配用戶。

示例:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
ser_dep1=「 password 1」
user_dep2=「 password 2」
user_dep3=「 password 3」;
};


服務啓動後,經過 Kafka 的 command line 接口,配置基於用戶、ip、topic、groupid 等的 acl 權限來保證各業務之間的隔離。

將來規劃

隨着業務的不斷髮展,Kafka 在華泰證券內部已成爲核心組件。將來重點進行 PaaS 平臺建設,創建分級保障和 ACL 權限管控,對重點業務進行獨立管理。目前 Kafka 的 topic 通常只有 2 個副本,在某些特殊場景下存在數據丟失的風險,將來咱們會經過升級擴容,基於業務的重要程度提高副本數,強化集羣的高可用性。後續咱們還會深刻研究 Kafka1.0,與 KafkaStreaming、KQL、Storm、Spark、Flink 等流式計算引擎相結合,依託 Kafka 打造公司級流式計算平臺。

相關文章
相關標籤/搜索