聯邦學習做爲新一代人工智能基礎技術,經過解決數據隱私與數據孤島問題,重塑金融、醫療、城市安防等領域。git
騰訊 Angel PowerFL 聯邦學習平臺構建在 Angel 機器學習平臺上,利用 Angel-PS 支持萬億級模型訓練的能力,將不少在 Worker 上的計算提高到 PS(參數服務器) 端;Angel PowerFL 爲聯邦學習算法提供了計算、加密、存儲、狀態同步等基本操做接口,經過流程調度模塊協調參與方任務執行狀態,而通訊模塊完成了任務訓練過程當中全部數據的傳輸。Angel PowerFL 聯邦學習已經在騰訊金融雲、騰訊廣告聯合建模等業務中開始落地,並取得初步的效果。github
Angel 機器學習平臺:https://github.com/Angel-ML 算法
Angel PowerFL 聯邦學習平臺在訓練任務過程中,對參與方之間的消息通訊要求極高,要求消息系統必須穩定可靠、保持高性能且能保證數據安全。Angel PowerFL 的學習任務在訓練過程中,參與方之間會有大量的加密數據經過通訊模塊傳輸,Angel PowerFL 對通訊服務有如下需求:docker
➡️ 穩定可靠apache
Angel PowerFL 的學習任務時長從幾分鐘到幾小時,算法執行對數據的準確性要求很高,不一樣算法的數據傳輸峯值也不同,這須要通訊模塊的服務足夠穩定,而且不能丟數據。安全
➡️ 高性能傳輸服務器
Angel PowerFL 底層經過 Spark 進行計算,Executor 併發執行會產生不少待傳輸的中間數據,通訊模塊須要將這些加密後的數據及時傳輸給對方,這就要求通訊服務作到低延時、高吞吐量。網絡
➡️ 數據安全架構
雖然 Angel PowerFL 全部數據都經過加密模塊進行了加密,但參與聯邦學習的業務可能分佈在不一樣公司;跨公網進行傳輸,須要通訊模塊足夠安全,不易被攻擊。併發
聯邦通訊服務在作技術預研的時候,考慮過 RPC 直連、HDFS 同步、MQ 同步三種技術方案。考慮到對安全和性能的要求比較高,排除了 RPC 直連和 HDFS 同步方案,肯定採用 MQ 同步方案。
MQ 可選的服務不少,好比 Pulsar、Kafka、RabbitMQ、TubeMQ 等。考慮到 Angel PowerFL 對穩定性、可靠性、高性能傳輸和數據安全有很高的需求,咱們諮詢了騰訊數據平臺部 MQ 團隊,他們向咱們推薦了 Pulsar。
隨後,咱們對 Pulsar 開展了深刻調研,發現 Pulsar 內置的諸多特性,正好知足了咱們對消息系統的要求。Pulsar broker 和 bookie 採用了計算存儲分層架構,保證了數據穩定可靠,性能良好;Pulsar 支持跨地域複製(geo-replication),解決了 PowerFL 跨聯邦同步 MQ 問題;而 Pulsar 的驗證和受權模式也能保證傳輸安全。
Apache Pulsar 是下一代雲原生分佈式消息和事件流平臺,採用了計算和存儲分層的架構:在 Broker 上進行 Pub/Sub 相關的計算,在 Apache BookKeeper 上存儲數據。
和傳統的消息平臺(如 Kafka)相比,這種架構有明顯的優點:
Pulsar 原生支持跨地域複製(Geo-replication),能夠在多個數據中心的多個 Pulsar 集羣中同時同步/異步複製數據。還能夠在消息級別,經過 setReplicationClusters 控制消息複製到哪些集羣。
在上圖中,不管 Producer P一、P2 和 P3 在何時分別將消息發佈給 Cluster A、Cluster B 和 Cluster C 中的 topic T1,這些消息均會馬上覆制到整個集羣。一旦完成複製,Consumer C1 和 C2 便可從本身所在的集羣消費這些消息。
因爲 Pulsar 的存儲設計基於分片,Pulsar 把主題分區劃分爲更小的塊,稱其爲分片。每一個分片都做爲 Apache BookKeeper ledger 來存儲,這樣構成分區的分片集合分佈在 Apache BookKeeper 集羣中。這樣設計方便咱們管理容量和水平擴展,而且知足高吞吐量的需求。
通過深刻調研後,咱們決定在騰訊 Angel PowerFL 聯邦學習平臺上使用 Apache Pulsar。
聯邦學習的各個業務(Angel PowerFL 稱之爲 Party,每一個 Party 有不一樣的 ID,如 10000/20000),可能分佈在同個公司的不一樣部門(無網絡隔離),也可能分佈在不一樣公司(跨公網),各個 Party 之間經過 Pulsar 跨地域複製功能進行同步複製,整體設計方案以下:
聯邦學習的每一個訓練任務,經過消息的 producer 和 consumer 鏈接所在 Party 的 Pulsar 集羣,集羣名以 fl-pulsar-[partyID] 進行區分,訓練任務產生須要傳輸的中間數據後,生產者將這些數據發送給本地 Pulsar 集羣。
Pulsar 集羣收到數據後,經過 Pulsar proxy 創建的同步複製網絡通道,將數據發送給使用方 Party。而使用方 Party 的消費者,會一直監聽該訓練任務對應的 topic,當有數據到達後,直接消費數據進行下一步的計算。
在 Angel PowerFL 執行訓練任務時,driver 和每一個 partition 會建立一個 channel 類型變量,該變量和 Pulsar 當中具體的 topic 一一對應,須要交換的數據都會通過生產者發送到這個 topic。
Angel PowerFL 支持多方聯邦,所以會有 2+ 個 Pulsar 集羣須要同步複製數據。每一個聯邦學習任務經過各自的 parties 任務參數指定了參與方,生產者在發送消息時調用 setReplicationClusters
接口,確保數據只在參與 Party 之間傳輸。
在 Angel PowerFL 的通訊模塊中,咱們充分利用了 Pulsar 的 geo-replication、topic 限流、Token Authentication 等功能。下面我來詳細介紹如何在 Angel PowerFL 聯邦學習平臺中使用 Pulsar。
在 Angel PowerFL 聯邦學習平臺上,部署一套完整的 Pulsar 依賴兩個 ZooKeeper 集羣,分別是 Local ZooKeeper 和 Global ZooKeeper。Local ZooKeeper 和 Kafka 中的 ZooKeeper 做用相似,用來存儲元數據。而 Global ZooKeeper 則在 Pulsar 多個集羣間中共享配置信息。
在 Angel PowerFL 場景中,每一個 Party 加入前,都要先部署一個 Global ZooKeeper 的子節點,或者共用一套跨公司或跨地域的公共 ZooKeeper,這樣不只會增長部署的難度,也會增長被攻擊的風險,不利於新 Party 加入。
Global ZooKeeper 中存儲的元數據,主要是集羣名/服務地址/namespace 權限等信息。Pulsar 支持建立和加入新集羣。咱們經過如下兩個步驟註冊聯邦 Pulsar 集羣的信息到 local ZooKeeper,就去除了對 Global ZooKeeper 的依賴:
步驟 1: 註冊新加入 Party 的 Pulsar 集羣
# OTHER_CLUSTER_NAME 爲待註冊 Party 的 Pulsar 集羣名 # OTHER_CLUSTER_BROKER_URL爲 Pulsar 集羣對應的 broker 地址 ./bin/pulsar-admin clusters create ${OTHER_CLUSTER_NAME} --url http://${OTHER_CLUSTER_HTTP_URL} --broker-url pulsar://${OTHER_CLUSTER_BROKER_URL}
步驟 2: 授予訓練用到的 namespace 訪問集羣權限
./bin/pulsar-admin namespaces set-clusters fl-tenant/${namespace} -clusters ${LOCAL_CLUSTR_NAME},${OTHER_CLUSTER_NAME}
對於新加入的 Party,只用提供與其對應的 Pulsar 的集羣名/服務地址便可完成註冊,geo-replication 就能夠經過註冊信息同步複製數據。
Pulsar 做爲 Angel PowerFL 的通訊模塊,沒有加入用戶級別的權限控制。爲了進一步保證 client 生產和消費數據的安全,咱們參考 Pulsar Client authentication using tokens based on JSON Web Tokens 增長了 token 認證,Angel PowerFL 的訓練任務除了配置當前 Party 使用的服務地址外,還須要配置 admin token。
https://pulsar.apache.org/doc...
因爲 Angel PowerFL 整套系統部署在 Kubernetes 上,咱們經過容器準備 Pulsar 集羣須要的 Public/Private keys 等文件,而後註冊到 K8S secret 中。
# 生成 fl-private.key 和 fl-public.key docker run --rm -v "$(pwd)":/tmp apachepulsar/pulsar-all:2.5.2 /pulsar/bin/pulsar tokens create-key-pair --output-private-key /tmp/fl-private.key --output-public-key /tmp/fl-public.key # 生成 admin-token.txt token 文件 echo -n `docker run --rm -v "$(pwd)":/tmp apachepulsar/pulsar-all:2.5.2 /pulsar/bin/pulsar tokens create --private-key file:///tmp/fl-private.key --subject admin` # 將認證相關的文件註冊到 K8S kubectl create secret generic token-symmetric-key --from-file=TOKEN=admin-token.txt --from-file=PUBLICKEY=fl-public.key -n ${PARTY_NAME}
Pulsar 集羣開啓了 geo-replication 功能後,沒法經過命令直接刪除用過的 topic,而 Angel PowerFL 訓練任務每次使用的任務是一次性的,任務結束後這些 topic 就沒用了,若是不及時刪除會出現大量累積。
對於經過 geo-replication 開啓複製的 topic,能夠配置 brokerDeleteInactivetopicsEnabled
參數,開啓 topic 自動回收。自動回收無用的 topic,需知足如下幾個條件:
Angel PowerFL 部署的 Pulsar 集羣,經過 brokerDeleteInactivetopicsEnabled 開啓 topic 自動回收。在執行訓練任務的過程當中,使用後對每一個 topic 按回收條件進行處理。同時,咱們增長了
brokerDeleteInactivetopicsFrequencySeconds 配置,將回收的頻率設置爲 3 小時。
Angel PowerFL 中的訓練任務,在不一樣的數據集/算法/執行階段,生產數據的流量峯值也不一樣。目前生產環境中單個任務最大的數據量超過 200G/小時。訓練過程當中,若是 Pulsar 鏈接中斷或者生產和消費過程出現異常,須要從新開始整個訓練任務。
爲了規避 Pulsar 集羣被單個訓練任務沖垮的風險,咱們使用了 Pulsar 的限流功能。Pulsar 支持 message-rate 和 byte-rate 兩種生產限流策略,前者限制每秒生產消息的數量,後者限制每秒生產消息的大小。Angel PowerFL 將數據切分紅多個 4M 的消息,經過 message-rate 限制生產消息的數量。在 Angel PowerFL 中,咱們將 namespace 的消息限制爲 30 條(小於<30*4=120M/s):
./bin/pulsar-admin namespaces set-publish-rate fl-tenant/${namespace} -m 30
剛開始測試 message-rate 的限流功能時,出現了限不住的狀況(限流設置失效)。騰訊數據平臺部 MQ 團隊負責 Pulsar 的同事幫忙一塊兒排查,發現設置 topicPublisherThrottlingTickTimeMillis 參數後,限制不能生效。
所以咱們想辦法在 broker 端啓用了精確的 topic 發佈頻率限制,優化了限流功能並貢獻回社區,詳情見 PR-7078: introduce precise topic publish rate limiting。
https://github.com/apache/pul...
Pulsar 根據 broker 集羣負載情況,能夠將 topic 動態分配到 broker上。若是擁有該 topic 的broker 宕機,或者擁有該 topic 的 broker 負載過大,則該 topic 會當即從新分配給另外一個 broker ;而從新分配的過程就是 topic 的 unloading,該操做意味着關閉 topic,釋放全部者(owner)。
理論上,topic unloading 由負載均衡調整,客戶端將經歷極小的延遲抖動,一般耗時 10ms 左右。但 Angel PowerFL 初期在執行訓練任務時,日誌爆出大量由於 unloading topic 致使的鏈接異常。日誌顯示 topic unloading 在不斷的重試,但都不成功:
[sub] Could not get connection to broker: topic is temporarily unavailable -- Will try again in 0.1 s
先來看 broker/namespace/bundle/topic 這四者的關係。Bundle 是 Pulsar namespace 的一個分片機制,namespace 被分片爲 bundle 列表,每一個 bundle 包含 namespace 的整個哈希範圍的一部分。Topic 不直接分配給 broker,而是經過計算 topic 的哈希碼將 topic 分配給特定的 bundle;每一個 bundle 互相獨立,再被分配到不一樣的 broker 上。
Angel PowerFL 早期的任務 topic 沒有複用,一個 LR 算法訓練任務建立了 2000 多個 topic,每一個 topic 生產的數據負載也不一樣,咱們判斷上述斷連問題是因爲短期內(最小任務十分鐘內能結束,同時會有多個任務在運行)大量建立和使用 topic,致使負載不均衡,topic unloading 頻繁發生。爲了下降 topic unloading 的頻率,咱們調整了 Pulsar Bundle 的相關參數:
# 增長 broker 可最大分配 topic 數量 loadBalancerBrokerMaxTopics=500000 # 啓用自動拆分namespace bundle loadBalancerAutoBundleSplitEnabled=true # 增長觸發拆分 bundle 的 topic 數量 loadBalancerNamespaceBundleMaxTopics=10000 # 增長觸發拆分 bundle 的消息數 loadBalancerNamespaceBundleMaxMsgRate=10000
同時,在建立 namespace 時,把 bundle 數量默認設置爲 64。
./bin/pulsar-admin namespaces create fl-tenant/${namespace} --bundles 64
通過以上調整,Angel PowerFL 在任務執行期間沒有再出現過因爲 topic unloading 致使的斷連。
Angel PowerFL 的全部服務均經過 Helm 部署在 Kubernetes 上。Pulsar 做爲其中的一個 chart,能夠很好的利用 K8S 的資源隔離、快速擴縮容等特性。在 Angel PowerFL 使用 Helm 部署 Pulsar 的實踐中,咱們總結了如下經驗:
🎙️ 使用 Local Persistent Volume 做爲存儲
Pulsar 是 IO 敏感的服務,尤爲 bookie 組件,在生產環境中建議使用 SSD 或獨立的磁盤。Angel PowerFL 在跑一些大數據集任務時,Pulsar 常常出現 「No Bookies Available」 的異常。這期間磁盤的 IO 使用率很高。
咱們經過 Local Persistent Volume 將 bookie 和 ZooKeeper 等其它組件掛載到單獨的磁盤,減緩了磁盤 IO 競爭。咱們也測試過將 Pulsar 的 PV 存儲換成 Ceph 和 NFS,性能都沒有直接使用 Local Persistent Volume 好。
🎙️ 使用 NodeSelector
Geo-replication 同步複製數據期間,broker 須要訪問對方的 Pulsar proxy 容器。Angel PowerFL 將網關機單獨打了標籤,經過 NodeSelector 將 broker 安裝在可訪問外網的網關機上。
🎙️ 配置 useHostNameAsBookieID
Bookie 是有狀態的組件,爲了 bookie pod 重建後服務正常,須要配置 useHostNameAsBookieID,確保向 ZooKeeper 註冊的 ID 是 pod 的 hostname。
Angel PowerFL 目前使用 Pulsar 快一年了,穩定運行時間最長的集羣已經超過半年,將來對Pulsar 的使用計劃主要有兩個。
👍 升級 Pulsar 到 2.6.x 版本
咱們目前使用的是 Pulsar 2.5.2 版本,因爲最近會使用 Pulsar Key_Shared 功能作 Angel-PS 的容災恢復。2.6.0 版本恰好有加強 Key_Shared 訂閱模式,因此咱們預計將來一個月升級到 Pulsar 2.6.x。
https://github.com/apache/pul...
👍 Pulsar on K8S 支持多磁盤掛載
Angel PowerFL 全部服務都運行在 Kubernetes 上(除了任務使用的 YARN 計算資源),Pulsar 做爲其中的一個 chart 和其它服務一塊兒部署,使用 Local Persistent Volume 做爲存儲。但目前 bookie 只支持掛載一塊磁盤(目錄),對於多磁盤的機器沒有更充分的利用,咱們計劃增長該特性。
咱們介紹了在人工智能應用場景下,使用 Pulsar 做爲 Angel PowerFL 通訊模塊的相關實踐。在方案實現過程中,咱們充分使用了 Pulsar 諸多內置特性,並根據自身需求作了相關優化,如 geo-replication 去掉 Global ZooKeeper 依賴,爲 client 增長 token 認證,開啓多集羣 topic 自動回收,優化 topic 限流功能和 topic unloading 配置等。
Pulsar 做爲下一代雲原生分佈式消息和流平臺,有衆多吸引人的功能,在直播與短視頻、零售與電子商務、媒體、金融等行業有普遍應用,期待 Pulsar 在不一樣的應用場景下不斷有新的案例落地。
特別感謝騰訊數據平臺部 MQ 團隊,在 Angel PowerFL 平臺使用 Pulsar 過程當中給與的技術指導。該團隊在 Apache Pulsar 和 TubeMQ 上有多年的技術積累,積極爲 Pulsar 社區作出了巨大貢獻。Pulsar 社區十分活躍,正處於快速成長之中。咱們會持續關注並和 Apache Pulsar 社區深刻合做,把優化的功能奉獻給 Pulsar 社區,和社區其餘用戶一塊兒進一步完善、優化 Pulsar 的特性和功能,共同建設一個更強大完善的 Pulsar 社區。
張超,騰訊數據平臺部高級工程師,負責 Angel PowerFL 聯邦通訊/PowerFL on K8S 等工做。他和騰訊數據平臺部 MQ 團隊一塊兒將 Apache Pulsar 引入 PowerFL 聯邦學習平臺,開啓了 Pulsar 在機器學習領域的應用。