Apache Pulsar 在騰訊 Angel PowerFL 聯邦學習平臺上的實踐

騰訊 Angel PowerFL 聯邦學習平臺

聯邦學習做爲新一代人工智能基礎技術,經過解決數據隱私與數據孤島問題,重塑金融、醫療、城市安防等領域。git

騰訊 Angel PowerFL 聯邦學習平臺構建在 Angel 機器學習平臺上,利用 Angel-­PS 支持萬億級模型訓練的能力,將不少在 Worker 上的計算提高到 PS(參數服務器) 端;Angel PowerFL 爲聯邦學習算法提供了計算、加密、存儲、狀態同步等基本操做接口,經過流程調度模塊協調參與方任務執行狀態,而通訊模塊完成了任務訓練過程當中全部數據的傳輸。Angel PowerFL 聯邦學習已經在騰訊金融雲、騰訊廣告聯合建模等業務中開始落地,並取得初步的效果。github

Angel 機器學習平臺:https://github.com/Angel-ML 算法

image

Angel PowerFL 對聯邦通訊服務的要求

Angel PowerFL 聯邦學習平臺在訓練任務過程中,對參與方之間的消息通訊要求極高,要求消息系統必須穩定可靠、保持高性能且能保證數據安全。Angel PowerFL 的學習任務在訓練過程中,參與方之間會有大量的加密數據經過通訊模塊傳輸,Angel PowerFL 對通訊服務有如下需求:docker

➡️ 穩定可靠apache

Angel PowerFL 的學習任務時長從幾分鐘到幾小時,算法執行對數據的準確性要求很高,不一樣算法的數據傳輸峯值也不同,這須要通訊模塊的服務足夠穩定,而且不能丟數據。安全

➡️ 高性能傳輸服務器

Angel PowerFL 底層經過 Spark 進行計算,Executor 併發執行會產生不少待傳輸的中間數據,通訊模塊須要將這些加密後的數據及時傳輸給對方,這就要求通訊服務作到低延時、高吞吐量。網絡

➡️ 數據安全架構

雖然 Angel PowerFL 全部數據都經過加密模塊進行了加密,但參與聯邦學習的業務可能分佈在不一樣公司;跨公網進行傳輸,須要通訊模塊足夠安全,不易被攻擊。併發

爲何選擇 Pulsar

聯邦通訊服務在作技術預研的時候,考慮過 RPC 直連、HDFS 同步、MQ 同步三種技術方案。考慮到對安全和性能的要求比較高,排除了 RPC 直連和 HDFS 同步方案,肯定採用 MQ 同步方案。

MQ 可選的服務不少,好比 Pulsar、Kafka、RabbitMQ、TubeMQ 等。考慮到 Angel PowerFL 對穩定性、可靠性、高性能傳輸和數據安全有很高的需求,咱們諮詢了騰訊數據平臺部 MQ 團隊,他們向咱們推薦了 Pulsar。

隨後,咱們對 Pulsar 開展了深刻調研,發現 Pulsar 內置的諸多特性,正好知足了咱們對消息系統的要求。Pulsar broker 和 bookie 採用了計算存儲分層架構,保證了數據穩定可靠,性能良好;Pulsar 支持跨地域複製(geo­-replication),解決了 PowerFL 跨聯邦同步 MQ 問題;而 Pulsar 的驗證和受權模式也能保證傳輸安全。

雲原生的計算與存儲分層架構

Apache Pulsar 是下一代雲原生分佈式消息和事件流平臺,採用了計算和存儲分層的架構:在 Broker 上進行 Pub/Sub 相關的計算,在 Apache BookKeeper 上存儲數據。

和傳統的消息平臺(如 Kafka)相比,這種架構有明顯的優點:

  • Broker 和 bookie 相互獨立,能夠獨立擴展和容錯,提高系統的可用性。
  • 分區存儲不受單個節點存儲容量的限制,數據分佈更均勻。
  • BookKeeper 存儲安全可靠,保證消息不丟失,同時支持批量刷盤以得到更高吞吐量。

image

Pulsar Geo­-replication

Pulsar 原生支持跨地域複製(Geo­-replication),能夠在多個數據中心的多個 Pulsar 集羣中同時同步/異步複製數據。還能夠在消息級別,經過 setReplicationClusters 控制消息複製到哪些集羣。

image

在上圖中,不管 Producer P一、P2 和 P3 在何時分別將消息發佈給 Cluster A、Cluster B 和 Cluster C 中的 topic T1,這些消息均會馬上覆制到整個集羣。一旦完成複製,Consumer C1 和 C2 便可從本身所在的集羣消費這些消息。

水平擴展

因爲 Pulsar 的存儲設計基於分片,Pulsar 把主題分區劃分爲更小的塊,稱其爲分片。每一個分片都做爲 Apache BookKeeper ledger 來存儲,這樣構成分區的分片集合分佈在 Apache BookKeeper 集羣中。這樣設計方便咱們管理容量和水平擴展,而且知足高吞吐量的需求。

  • 容量管理簡單:主題分區的容量能夠擴展至整個 BookKeeper 集羣的容量,不受單個節點容量的限制。
  • 擴容簡單:擴容無需從新平衡或複製數據。添加新存儲節點時,新節點僅用於新分片或其副本,Pulsar 自動平衡分片分佈和集羣中的流量。
  • 高吞吐量:寫入流量分佈在存儲層中,不會出現分區寫入爭用單個節點資源的狀況。

通過深刻調研後,咱們決定在騰訊 Angel PowerFL 聯邦學習平臺上使用 Apache Pulsar。

基於 Apache Pulsar 的聯邦通訊方案

聯邦學習的各個業務(Angel PowerFL 稱之爲 Party,每一個 Party 有不一樣的 ID,如 10000/20000),可能分佈在同個公司的不一樣部門(無網絡隔離),也可能分佈在不一樣公司(跨公網),各個 Party 之間經過 Pulsar 跨地域複製功能進行同步複製,整體設計方案以下:

image

聯邦學習的每一個訓練任務,經過消息的 producer 和 consumer 鏈接所在 Party 的 Pulsar 集羣,集羣名以 fl-pulsar-[partyID] 進行區分,訓練任務產生須要傳輸的中間數據後,生產者將這些數據發送給本地 Pulsar 集羣。

Pulsar 集羣收到數據後,經過 Pulsar proxy 創建的同步複製網絡通道,將數據發送給使用方 Party。而使用方 Party 的消費者,會一直監聽該訓練任務對應的 topic,當有數據到達後,直接消費數據進行下一步的計算。

image

在 Angel PowerFL 執行訓練任務時,driver 和每一個 partition 會建立一個 channel 類型變量,該變量和 Pulsar 當中具體的 topic 一一對應,須要交換的數據都會通過生產者發送到這個 topic。

Angel PowerFL 支持多方聯邦,所以會有 2+ 個 Pulsar 集羣須要同步複製數據。每一個聯邦學習任務經過各自的 parties 任務參數指定了參與方,生產者在發送消息時調用 setReplicationClusters 接口,確保數據只在參與 Party 之間傳輸。

在 Angel PowerFL 的通訊模塊中,咱們充分利用了 Pulsar 的 geo-­replication、topic 限流、Token Authentication 等功能。下面我來詳細介紹如何在 Angel PowerFL 聯邦學習平臺中使用 Pulsar。

Geo­-replication 去掉Global ZooKeeper 依賴

在 Angel PowerFL 聯邦學習平臺上,部署一套完整的 Pulsar 依賴兩個 ZooKeeper 集羣,分別是 Local ZooKeeper 和 Global ZooKeeper。Local ZooKeeper 和 Kafka 中的 ZooKeeper 做用相似,用來存儲元數據。而 Global ZooKeeper 則在 Pulsar 多個集羣間中共享配置信息。

image

在 Angel PowerFL 場景中,每一個 Party 加入前,都要先部署一個 Global ZooKeeper 的子節點,或者共用一套跨公司或跨地域的公共 ZooKeeper,這樣不只會增長部署的難度,也會增長被攻擊的風險,不利於新 Party 加入。

Global ZooKeeper 中存儲的元數據,主要是集羣名/服務地址/namespace 權限等信息。Pulsar 支持建立和加入新集羣。咱們經過如下兩個步驟註冊聯邦 Pulsar 集羣的信息到 local ZooKeeper,就去除了對 Global ZooKeeper 的依賴:

步驟 1: 註冊新加入 Party 的 Pulsar 集羣

# OTHER_CLUSTER_NAME 爲待註冊 Party 的 Pulsar 集羣名
# OTHER_CLUSTER_BROKER_URL爲 Pulsar 集羣對應的 broker 地址
./bin/pulsar-admin clusters create ${OTHER_CLUSTER_NAME} 
 --url http://${OTHER_CLUSTER_HTTP_URL} 
 --broker-url pulsar://${OTHER_CLUSTER_BROKER_URL}

步驟 2: 授予訓練用到的 namespace 訪問集羣權限

./bin/pulsar-admin namespaces set-clusters fl-tenant/${namespace} 
 -clusters ${LOCAL_CLUSTR_NAME},${OTHER_CLUSTER_NAME}

對於新加入的 Party,只用提供與其對應的 Pulsar 的集羣名/服務地址便可完成註冊,geo-replication 就能夠經過註冊信息同步複製數據。

Client 增長 Token 認證

Pulsar 做爲 Angel PowerFL 的通訊模塊,沒有加入用戶級別的權限控制。爲了進一步保證 client 生產和消費數據的安全,咱們參考 Pulsar Client authentication using tokens based on JSON Web Tokens 增長了 token 認證,Angel PowerFL 的訓練任務除了配置當前 Party 使用的服務地址外,還須要配置 admin token。

https://pulsar.apache.org/doc...
因爲 Angel PowerFL 整套系統部署在 Kubernetes 上,咱們經過容器準備 Pulsar 集羣須要的 Public/Private keys 等文件,而後註冊到 K8S secret 中。

# 生成 fl-private.key 和 fl-public.key
docker run --rm -v "$(pwd)":/tmp 
 apachepulsar/pulsar-all:2.5.2 
 /pulsar/bin/pulsar tokens create-key-pair --output-private-key 
 /tmp/fl-private.key --output-public-key /tmp/fl-public.key
# 生成 admin-token.txt token 文件
echo -n `docker run --rm -v 
 "$(pwd)":/tmp apachepulsar/pulsar-all:2.5.2 
 /pulsar/bin/pulsar tokens create --private-key 
 file:///tmp/fl-private.key --subject admin`
# 將認證相關的文件註冊到 K8S
kubectl create secret generic token-symmetric-key 
 --from-file=TOKEN=admin-token.txt 
 --from-file=PUBLICKEY=fl-public.key -n ${PARTY_NAME}

開啓多集羣 topic 自動回收

Pulsar 集羣開啓了 geo-­replication 功能後,沒法經過命令直接刪除用過的 topic,而 Angel PowerFL 訓練任務每次使用的任務是一次性的,任務結束後這些 topic 就沒用了,若是不及時刪除會出現大量累積。

對於經過 geo­-replication 開啓複製的 topic,能夠配置 brokerDeleteInactivetopicsEnabled 參數,開啓 topic 自動回收。自動回收無用的 topic,需知足如下幾個條件:

  • 當前 topic 沒有生產者( producer)或者消費者(consumer)鏈接
  • 當前 topic 沒有被訂閱
  • 當前 topic 沒有須要保留的信息

Angel PowerFL 部署的 Pulsar 集羣,經過 brokerDeleteInactivetopicsEnabled 開啓 topic 自動回收。在執行訓練任務的過程當中,使用後對每一個 topic 按回收條件進行處理。同時,咱們增長了

brokerDeleteInactivetopicsFrequencySeconds 配置,將回收的頻率設置爲 3 小時。

優化 topic 限流

Angel PowerFL 中的訓練任務,在不一樣的數據集/算法/執行階段,生產數據的流量峯值也不一樣。目前生產環境中單個任務最大的數據量超過 200G/小時。訓練過程當中,若是 Pulsar 鏈接中斷或者生產和消費過程出現異常,須要從新開始整個訓練任務。

爲了規避 Pulsar 集羣被單個訓練任務沖垮的風險,咱們使用了 Pulsar 的限流功能。Pulsar 支持 message-rate 和 byte-rate 兩種生產限流策略,前者限制每秒生產消息的數量,後者限制每秒生產消息的大小。Angel PowerFL 將數據切分紅多個 4M 的消息,經過 message-­rate 限制生產消息的數量。在 Angel PowerFL 中,咱們將 namespace 的消息限制爲 30 條(小於<30*4=120M/s):

./bin/pulsar-admin namespaces set-publish-rate fl-tenant/${namespace} -m 30

剛開始測試 message-rate 的限流功能時,出現了限不住的狀況(限流設置失效)。騰訊數據平臺部 MQ 團隊負責 Pulsar 的同事幫忙一塊兒排查,發現設置 topicPublisherThrottlingTickTimeMillis 參數後,限制不能生效。

所以咱們想辦法在 broker 端啓用了精確的 topic 發佈頻率限制,優化了限流功能並貢獻回社區,詳情見 PR-7078: introduce precise topic publish rate limiting。
https://github.com/apache/pul...

優化 topic unloading 配置

Pulsar 根據 broker 集羣負載情況,能夠將 topic 動態分配到 broker上。若是擁有該 topic 的broker 宕機,或者擁有該 topic 的 broker 負載過大,則該 topic 會當即從新分配給另外一個 broker ;而從新分配的過程就是 topic 的 unloading,該操做意味着關閉 topic,釋放全部者(owner)。

理論上,topic unloading 由負載均衡調整,客戶端將經歷極小的延遲抖動,一般耗時 10ms 左右。但 Angel PowerFL 初期在執行訓練任務時,日誌爆出大量由於 unloading topic 致使的鏈接異常。日誌顯示 topic unloading 在不斷的重試,但都不成功:

[sub] Could not get connection to broker: topic is temporarily unavailable -- Will try again in 0.1 s

先來看 broker/namespace/bundle/topic 這四者的關係。Bundle 是 Pulsar namespace 的一個分片機制,namespace 被分片爲 bundle 列表,每一個 bundle 包含 namespace 的整個哈希範圍的一部分。Topic 不直接分配給 broker,而是經過計算 topic 的哈希碼將 topic 分配給特定的 bundle;每一個 bundle 互相獨立,再被分配到不一樣的 broker 上。

Angel PowerFL 早期的任務 topic 沒有複用,一個 LR 算法訓練任務建立了 2000 多個 topic,每一個 topic 生產的數據負載也不一樣,咱們判斷上述斷連問題是因爲短期內(最小任務十分鐘內能結束,同時會有多個任務在運行)大量建立和使用 topic,致使負載不均衡,topic unloading 頻繁發生。爲了下降 topic unloading 的頻率,咱們調整了 Pulsar Bundle 的相關參數:

# 增長 broker 可最大分配 topic 數量
loadBalancerBrokerMaxTopics=500000
# 啓用自動拆分namespace bundle
loadBalancerAutoBundleSplitEnabled=true
# 增長觸發拆分 bundle 的 topic 數量
loadBalancerNamespaceBundleMaxTopics=10000
# 增長觸發拆分 bundle 的消息數
loadBalancerNamespaceBundleMaxMsgRate=10000

同時,在建立 namespace 時,把 bundle 數量默認設置爲 64。

./bin/pulsar-admin namespaces create fl-tenant/${namespace} --bundles 64

通過以上調整,Angel PowerFL 在任務執行期間沒有再出現過因爲 topic unloading 致使的斷連。

Pulsar on Kubernetes

Angel PowerFL 的全部服務均經過 Helm 部署在 Kubernetes 上。Pulsar 做爲其中的一個 chart,能夠很好的利用 K8S 的資源隔離、快速擴縮容等特性。在 Angel PowerFL 使用 Helm 部署 Pulsar 的實踐中,咱們總結了如下經驗:

🎙️ 使用 Local Persistent Volume 做爲存儲

Pulsar 是 IO 敏感的服務,尤爲 bookie 組件,在生產環境中建議使用 SSD 或獨立的磁盤。Angel PowerFL 在跑一些大數據集任務時,Pulsar 常常出現 「No Bookies Available」 的異常。這期間磁盤的 IO 使用率很高。

咱們經過 Local Persistent Volume 將 bookie 和 ZooKeeper 等其它組件掛載到單獨的磁盤,減緩了磁盤 IO 競爭。咱們也測試過將 Pulsar 的 PV 存儲換成 Ceph 和 NFS,性能都沒有直接使用 Local Persistent Volume 好。

🎙️ 使用 NodeSelector

Geo-replication 同步複製數據期間,broker 須要訪問對方的 Pulsar proxy 容器。Angel PowerFL 將網關機單獨打了標籤,經過 NodeSelector 將 broker 安裝在可訪問外網的網關機上。

🎙️ 配置 useHostNameAsBookieID

Bookie 是有狀態的組件,爲了 bookie pod 重建後服務正常,須要配置 useHostNameAsBookieID,確保向 ZooKeeper 註冊的 ID 是 pod 的 hostname。

將來計劃

Angel PowerFL 目前使用 Pulsar 快一年了,穩定運行時間最長的集羣已經超過半年,將來對Pulsar 的使用計劃主要有兩個。

👍 升級 Pulsar 到 2.6.x 版本

咱們目前使用的是 Pulsar 2.5.2 版本,因爲最近會使用 Pulsar Key_Shared 功能作 Angel-PS 的容災恢復。2.6.0 版本恰好有加強 Key_Shared 訂閱模式,因此咱們預計將來一個月升級到 Pulsar 2.6.x。
https://github.com/apache/pul...

👍 Pulsar on K8S 支持多磁盤掛載

Angel PowerFL 全部服務都運行在 Kubernetes 上(除了任務使用的 YARN 計算資源),Pulsar 做爲其中的一個 chart 和其它服務一塊兒部署,使用 Local Persistent Volume 做爲存儲。但目前 bookie 只支持掛載一塊磁盤(目錄),對於多磁盤的機器沒有更充分的利用,咱們計劃增長該特性。

總結

咱們介紹了在人工智能應用場景下,使用 Pulsar 做爲 Angel PowerFL 通訊模塊的相關實踐。在方案實現過程中,咱們充分使用了 Pulsar 諸多內置特性,並根據自身需求作了相關優化,如 geo-­replication 去掉 Global ZooKeeper 依賴,爲 client 增長 token 認證,開啓多集羣 topic 自動回收,優化 topic 限流功能和 topic unloading 配置等。

Pulsar 做爲下一代雲原生分佈式消息和流平臺,有衆多吸引人的功能,在直播與短視頻、零售與電子商務、媒體、金融等行業有普遍應用,期待 Pulsar 在不一樣的應用場景下不斷有新的案例落地。

致 謝

特別感謝騰訊數據平臺部 MQ 團隊,在 Angel PowerFL 平臺使用 Pulsar 過程當中給與的技術指導。該團隊在 Apache Pulsar 和 TubeMQ 上有多年的技術積累,積極爲 Pulsar 社區作出了巨大貢獻。Pulsar 社區十分活躍,正處於快速成長之中。咱們會持續關注並和 Apache Pulsar 社區深刻合做,把優化的功能奉獻給 Pulsar 社區,和社區其餘用戶一塊兒進一步完善、優化 Pulsar 的特性和功能,共同建設一個更強大完善的 Pulsar 社區。

做者簡介

張超,騰訊數據平臺部高級工程師,負責 Angel PowerFL 聯邦通訊/PowerFL on K8S 等工做。他和騰訊數據平臺部 MQ 團隊一塊兒將 Apache Pulsar 引入 PowerFL 聯邦學習平臺,開啓了 Pulsar 在機器學習領域的應用。

相關文章
相關標籤/搜索