關於 Apache Pulsar
Apache Pulsar 是 Apache 軟件基金會頂級項目,是下一代雲原生分佈式消息流平臺,集消息、存儲、輕量化函數式計算爲一體,採用計算與存儲分離架構設計,支持多租戶、持久化存儲、多機房跨區域數據複製,具備強一致性、高吞吐、低延時及高可擴展性等流數據存儲特性。
GitHub 地址:http://github.com/apache/pulsar/git
案例導讀:本案例介紹了清華大學能源互聯網創新研究院將 Apache Pulsar 落地能源互聯網方向的實踐。Pulsar 的雲原生架構、Schema、Functions 等特性知足了相關業務需求,也減輕了他們開發和運維負擔。github
閱讀本文須要大約 8 分鐘。算法
能源互聯網是電力與能源工業發展的方向。隨着信息、通訊和互聯網技術的飛速發展,可獲取的數據量正以爆炸式方式迅猛增加,傳統的數據處理方法已難以應對這些海量且增加極快的信息資產,大數據理論正是在這樣的狀態下應運而生。大數據處理技術能幫助咱們透過海量數據快速分辨其運行狀態及發展趨勢,在紛繁的世界中獨具洞察力。數據庫
清華大學能源互聯網創新研究院能源大數據與開放生態研究中心聚集了國內外能源及電力大數據領域的多位專家,致力於推進大數據基礎理論和實踐應用的全面創新。能源大數據與開放生態研究中心將大數據技術應用於能源互聯網、智能電網和智慧用能等工程場景,結合高性能優化、並行計算和人工智能等先進技術,研發適用於能源電力行業特色的大數據 / 雲計算平臺,和基於數據驅動的能源電力系統的高級應用,從而實現大數據產業的發展,造成以數據爲核心的新型產業鏈,推進我國能源產業的轉型與升級。apache
咱們團隊的業務主要是與電力相關的物聯網場景,旨在實現用戶對傳感器等設備數據的需求開發。咱們團隊規模較小,但任務繁雜,但願能更快更穩地實現客戶的需求。後端
在整理業務需求後,咱們提出之後端即服務(BaaS)爲主、基於消息的服務方案。在物聯網領域內,基於這樣的解決方案,咱們能夠共用更多基礎設施服務,同時能夠快速應對不一樣需求進行業務開發。考慮到特殊的業務需求,咱們的平臺須要具有如下特性:安全
以前咱們使用基於 RabbitMQ 和 Celery 的方案來實現用戶自定義 functions 的函數引擎。這一方案的最初使用效果良好,但隨着業務的增加,問題愈來愈多。咱們的小團隊不得不花更多時間來解決問題和優化總體方案。當 Celery 做爲任務隊列時,這些問題尤其嚴重。性能優化
咱們花費大量的時間和精力處理的問題主要有兩個:數據結構
此外,在特殊場景中,若是單個消息比較大且消息處理時間長時,Celery 和 RabbitMQ 的內存負擔都比較大。架構
隨着客戶數量和項目數量的增長,這些問題變得日益突出,咱們決定找一個新產品替代原有方案。
如上所述,咱們但願消息中間件能夠提供如下特性:
在調研不一樣的消息中間件時,咱們很快發現了 Pulsar。經過 Pulsar 的文檔和發佈日誌,咱們瞭解到 Pulsar 有不少優秀的特性,因此決定對 Pulsar 進行測試和評估。通過深刻研究、學習,咱們發現 Pulsar 的雲原生架構、Schema、Functions 等很是適合咱們的業務需求。
深刻了解 Pulsar 後,咱們決定對 Pulsar 進行測試,並嘗試遷移一個生產環境的應用。
樓宇智慧用電是咱們在用電分析和預測領域作的一次嘗試,咱們但願採集到辦公室中每個用電點的用電信息。在研究院新辦公樓裝修初期,咱們進行了技術評估,將使用 zigbee 協議的智能插座列入了裝修方案。整個部署包含三層樓,約 700 個智能插座和 50 個 zigbee 網關。插座部署在辦公場所的全部用電點,包含工位插座、牆壁插座以及中央空調風機插座。全部數據經過智能插座廠商提供的局域網廣播方案,將廣播數據轉發到 Pulsar 中實現數據點的採集和預處理。目前用電量數據每 10 秒鐘上送一次,其餘與用戶相關的操做(包括開關插座、插拔用電設備)則實時上送。針對這些數據,咱們作了一些數據可視化的嘗試,並把數據貢獻給研究院的其餘團隊進行分析,或用做開發算法的參考信息和原始數據。
基於智能插座設備廠商提供的 MQTT 方案,咱們嘗試將 MQTT 協議的數據都轉發到 Pulsar 中。在轉發過程當中,咱們遇到的主要問題是 MQTT topic 和 Pulsar topic 的映射。咱們的解決方案是直接把全部的 MQTT 數據轉發到同一個 Pulsar topic 中,同時把部分元數據包裝在轉發的消息中,再經過 Pulsar Functions 作消息路由,把消息轉發到不一樣的業務 topic 中。下圖展現瞭如何將傳感器產生的數據傳送至平臺並最終入庫。
在從 MQTT 轉發數據到 Pulsar 的過程當中,咱們默認把全部設備的數據都轉發到同一個 topic 中,並經過 verificate function 進行驗證(包括解密和內容檢查),保障數據的合法性。合法的數據會被轉發到一箇中間 topic 等待消息路由分發,消息分發的 function 會從數據中解析出設備類型和消息類型,再轉發到對應業務 topic 中,等待被對應業務 topic 綁定的 ETL function 作處理。在使用 ETL function 處理時,咱們也會根據設備類型提取不一樣的數據,對網關設備提取網關狀態、設備信息,對插座提取用電數據和插座的狀態信息。這些信息會匹配咱們平臺的 Schema Registry 數據結構,咱們再把生成的數據作 Schema Mapping(經過 Functions 實現),最後統一轉發這些結構化的數據到 sink topic 中,由 sink function 寫入到數據庫。
樓宇智慧用電的遷移測試有力驗證了 Pulsar 符合咱們的需求。在遷移過程當中,咱們查閱了 Pulsar 文檔,從社區得到了大力支持和幫助,遷移過程高效、順利。藉助 Functions 的開放與便利,咱們很快完成了流程圖中全部 function 的開發和調試,上線了整個業務系統。
在業務遷移過程當中,Pulsar 運行狀態良好,團隊一致認爲 Pulsar 能夠幫助咱們減輕開發和運維負擔,因此咱們選擇 Pulsar 做爲研究中心惟一的消息中間件服務,咱們的小團隊也開始跟隨 Pulsar 一塊兒進行一系列雲原生遷移和優化工做。
決定方案後,咱們將 Apache Pulsar 進一步應用到電網智能傳感和智能變電所的場景,這些場景都與物聯網、能源和電力相關。下文將詳細介紹咱們如何使用 Pulsar 和 Pulsar Functions,以及如何經過 Pulsar Functions 簡化傳感器數據流的相關處理。
電網智能傳感場景主要基於清華大學能源互聯網創新研究院與電網公司合做的輸電線路智能多參數傳感器集成研究項目。該項目的傳感器來自不一樣的廠家,分佈在輸電線路的各個位置,傳感器類型所以也不盡相同,包括杆塔、杆塔上、輸電線路側等十多種。整個系統目前接入總長度約六百千米,包含六百多個杆塔的輸電線路傳感器。這一場景主要負責對各類傳感器的數據進行在線監測和告警,同時,咱們也單獨針對電壓傳感器作了暫態電壓分析。
這個應用場景有兩個難點:一是來自不一樣廠商的傳感器沒有統一的通訊協議,有的使用電力相關的 IEC104 規約,有的使用 protobuf 或其餘廠商自定義協議;二是項目數據量比較大,有些傳感器可能會單次產生 20 MB 甚至更大的消息,有些傳感器則每秒上傳一次數據。
藉助 Pulsar,咱們選擇在 producer 端不作任何數據處理,直接將數據轉發到 Pulsar 中,再經過 Pulsar Functions 作進一步的數據預處理和其餘業務操做。以電壓傳感器爲例,電壓傳感器會產生三類數據,分別是心跳數據、穩態波形數據和暫態波形數據。其中心跳數據和穩態波形數據經過 protobuf 協議傳輸,暫態數據則經過 zip 壓縮文件的形式傳輸。接收到 protobuf 的數據後,藉助 Pulsar Functions 進行一系列的數據處理,包括經過解密 function 完成數據解密和 protobuf 的反序列化,再對數據進行路由,經過對應的 ETL function 作數據處理和解析,最後經過 Schema Mapping 將數據入庫。咱們把這個流程的每一步都封裝成獨立的 Pulsar function,這樣作出於三點考慮:
這個方案也遇到了一些小困難,好比因爲 function 比較多,咱們須要花更多時間部署、維護每個過程的中間 topic。目前,咱們的解決方案是直接寫對應的代碼一次性完成部署和維護。雖然須要投入更多精力,但咱們認爲這種 function 的開發和部署模式是值得的。上文提到電壓傳感器除了會產生 protobuf 的兩種數據外,還會產生一種暫態數據。暫態數據通常在電網發生故障或異常時產生,相似電力系統的快照,記錄故障發生前到發生時,再到發生後的波形狀態。在電力系統中,暫態數據一般有標準的存儲方案和特定的解析接口。相對於傳感器產生的其餘數據來講,這類數據的特色是比較大,動輒幾十兆。咱們應對暫態數據的方案是先解壓縮這些數據,再分析數據文件。這裏咱們藉助了 Pulsar Functions 多語言支持的特性,流程圖中的藍色部分使用 Go function 實現,黃色部分使用 Python 實現,Python 有一個解析電網暫態數據的庫,能夠調用,就免去了咱們本身花時間實現一套 Go 版本解析接口的工做。
智能變電所是咱們在變電系統中變電環節的一些嘗試,這個項目基於咱們合做的智能輸變電設備廠商,但願基於開關櫃等變電所設備實現變電所的數據接入。這個項目的主要目標是實現實時監測、故障診斷和異常監測這三大功能。
在智能變電所的場景中,一般由設備生產廠商提供設備的故障診斷算法或診斷應用,咱們須要將不一樣性質的算法或應用集成到現有方案中。客戶提供的算法可能直接在 Pulsar Functions 中調用,也多是已經編譯好的可執行文件,甚至多是其餘語言的實現,好比 R 語言。針對這一系列問題,咱們先把客戶提供的實現封裝在 Docker 容器中,在容器中實現一個最小的 Pulsar function runtime,再經過 Docker proxy function 和 Docker endpoint 溝通,在觸發 function 時建立對應算法的容器實現計算,最後將結果回傳到 Pulsar 對應的 topic 中。
另外,在這一場景中咱們也遇到了一些應用層面的需求,好比消息推送。咱們藉助 Pulsar Functions 實現了一些業務功能,在 Functions 中能夠很方便地調用不一樣服務商的接口,實現消息推送,好比短信、郵件、應用程序的推送服務。此外,經過 Pulsar Functions,咱們得以把消息推送的業務需求從平臺中解藕出來,把服務作成 function,便於後續在有一樣需求的場景中直接使用。
咱們在使用 Pulsar 的過程當中遇到了一些問題,下文會分享解決這些問題的一些經驗,但願能夠對準備或者已經在使用 Pulsar 的同窗提供一些幫助。
第一個是關於 Pulsar 默認消息大小的問題。在默認配置下,Pulsar 支持的最大消息是 5 MB,在上文提到的智慧電網案例中,單條消息有時會超過 20 MB。咱們根據文檔修改了 broker 配置文件中的 MaxMessageSize
參數,但修改的配置並無生效,超過 5 MB 的消息依然不能正常傳遞到 Pulsar 中。因而咱們在 Pulsar 社區尋求幫助,獲得了社區的迅速回應。這個問題的主要緣由是 Pulsar 2.4.0 中 MaxMessageSize
沒有同步到 BookKeeper,因此即便 broker 能夠接收更大的消息,broker 仍然不能把消息傳遞到負責存儲的 BookKeeper 中。所以除了修改 MaxMessageSize
值外,還須要修改 broker 和 BookKeeper 中 nettyFrameSizeBytes
相關配置,這些配置保持一致,Pulsar 就能夠處理更大的單條消息。
第二個問題是咱們在使用 Pulsar Functions 處理數據時,topic 中可能會出現 backlog 積壓愈來愈多的狀況。Backlog 包括沒有發送給 Functions(consumer)的數據,也包括已發送但未被 Functions(consumer)ack 的數據。根據咱們的經驗,在 Functions 場景下,消息積壓多是由於 function 處理單條消息的速度慢,處理時間長,或者 function 崩潰。若是是由於 function 處理消息慢,一種解決方案是增長 function 的並行數量,再具體分析執行速度慢的緣由並進行優化;另外一種方案是把複雜的 function 分紅多個簡單的 function,也就是在智能電網場景中提到的把一個複雜的 function 拆成多個 function,經過 function 的鏈式模式把整個流程連接起來。這樣咱們能夠很方便地觀測每個 function 的狀態,也能夠針對某個 function 作進一步的優化。若是因爲 function 崩潰形成 backlog 積壓,則須要保障 function 的穩定性,並藉助 function 的 log topic 進行調試。
第三個問題是當 producer 數量增長時,很難統一管理和觀測每一個 producer 的狀態,即 producer 與 broker 之間的通訊狀態和 producer 與數據源之間的通訊狀態。針對這個問題,咱們目前的解決方案是給 producer 增長心跳消息到對應的心跳 topic 作總體監控,同時,監控 producer 和 broker 的狀態鏈接。經過這些改動,咱們能夠較好地聚合觀測 producer 的運行狀態。咱們注意到 GitHub 上也在討論相似問題,期待和社區一塊兒提出更優秀的解決方案。
咱們期待 Pulsar 能改善或增長如下功能。
做爲一個開源項目,Pulsar 正在快速發展,文檔更新迅速,社區響應及時,社區規模不斷壯大。咱們但願深刻了解 Pulsar,參與 Pulsar 開發貢獻,和社區分享咱們的實踐經驗,與 Pulsar 社區共同發展。
在使用 Pulsar 的過程當中,咱們遇到一些困惑,感謝 StreamNative 團隊小夥伴們的大力支持,幫助咱們順利將 Pulsar 應用到上述業務場景中。將來,咱們會積極嘗試 Pulsar 的各類新功能,並將 Pulsar 應用於更多的能源互聯網場景中。
胡軍,清華大學電機系副教授,清華大學能源互聯網創新研究院能源大數據與開放生態研究中心執行主任,IEEE Member,CIGRE Member。