Apache Spark™ 3.0中全新的Structured Streaming UI

做者:Genmao Yu
原文連接:https://databricks.com/blog/2020/07/29/a-look-at-the-new-structured-streaming-ui-in-apache-spark-3-0.htmlhtml

編譯:邵嘉陽,計算機科學與技術大三在讀,Apache Spark 中文社區志願者java


在Apache Spark 2.0中,咱們迎來了Structured Streaming——構建分佈式流處理應用的最佳平臺。統一的API(SQL,Dataset和DataFrame)以及Spark內置的大量函數爲開發者實現複雜的需求提供了便利,好比流的聚合,流-流鏈接和窗口支持。開發者們廣泛喜歡經過Spark Streaming中的DStream的方式來管理他們的流,那麼相似的功能何時能在Structured Streaming中獲得實現呢?這不,在Apache Spark 3.0中,全新的Structured Streaming可視化UI和開發者們見面了。web

新的Structured Streaming UI會提供一些有用的信息和統計數據,以此來監視全部流做業,便於在開發調試過程當中排除故障。同時,開發者還可以得到實時的監測數據,這能使生產流程更直觀。在這個新的UI中,咱們會看到兩組統計數據:1)流查詢做業的聚合信息;2)流查詢的具體統計信息,包括輸入速率(Input Rate)、處理速率(Process Rate)、輸入行數(Input Rows)、批處理持續時間(Batch Duration)和操做持續時間(Operation Duration)等。apache

流查詢做業的聚合信息

開發者提交的流SQL查詢會被列在Structured Streaming一欄中,包括正在運行的流查詢(active)和已完成的流查詢(completed)。結果表則會顯示流查詢的一些基本信息,包括查詢名稱、狀態、ID、運行ID、提交時間、查詢持續時間、最後一批的ID以及一些聚合信息,如平均輸入速率和平均處理速率。流查詢有三種狀態:運行(RUNNING)、結束(FINISHED)、失敗(FAILED)。全部結束(FINISHED)和失敗(FAILED)的查詢都在已完成的流式查詢表中列出。Error列顯示有關失敗查詢的詳細信息。bootstrap

咱們能夠經過單擊Run ID連接查看流查詢的詳細信息。服務器

詳細的統計信息

Statistics頁面顯示了包括輸入速率、處理速率、延遲和詳細的操做持續時間在內的一系列指標。經過圖表,開發者能全面瞭解已提交的流查詢的狀態,而且輕鬆地調試查詢處理中的異常狀況。

微信

它包含如下指標:app

  • Input Rate:數據到達的聚合速率(跨全部源)。dom

  • Process Rate:Spark處理數據的聚合速率(跨全部源)。分佈式

  • Batch Duration:每一批的處理時間。

  • Operation Duration:執行各類操做所花費的時間(以毫秒爲單位)。
    被追蹤的操做羅列以下:

  • addBatch:從源讀取微批的輸入數據、對其進行處理並將批的輸出寫入接收器所花費的時間。這應該會佔用微批處理的大部分時間。

  • getBatch:準備邏輯查詢以從源讀取當前微批的輸入所花費的時間。

  • getOffset:查詢源是否有新的輸入數據所花費的時間。

  • walCommit:將偏移量寫入元數據日誌。

  • queryPlanning:生成執行計劃。

須要注意的是,因爲數據源的類型不一樣,一個查詢可能不會包含以上列出的全部操做。

使用UI解決流的性能故障

在這一部分中,咱們會看到新的UI是怎樣實時、直觀地顯示查詢執行過程當中的異常狀況的。咱們會在每一個例子中預先假設一些條件,樣例查詢看起來是這樣的:

import java.util.UUID

val bootstrapServers = ...
val topics = ...
val checkpointLocation = "/tmp/temporary-" + UUID.randomUUID.toString

val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topics)
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]

val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()

val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.option("checkpointLocation", checkpointLocation)
.start()

因爲處理能力不足而增長延遲

在第一種狀況下,咱們但願儘快處理Apache Kafka數據。在每一批中,流做業將處理Kafka中全部可用的數據。若是處理能力不足以處理批數據,那麼延遲將迅速增長。最直觀的現象是Input Rows和Batch Duration會呈線性上升。Process Rate提示流做業每秒最多隻能處理大約8000條記錄,可是當前的輸入速率是每秒大約20000條記錄。產生問題的緣由一目瞭然,那麼咱們能夠爲流做業提供更多的執行資源,或者添加足夠的分區來處理與生產者匹配所需的全部消費者。

穩定但高延遲

第二種狀況下,延遲並無持續增長,而是保持穩定,以下截圖所示:

咱們發如今相同的Input Rate下,Process Rate能夠保持穩定。這意味着做業的處理能力足以處理輸入數據。然而,每批的延遲仍然高達20秒。這裏,高延遲的主要緣由是每一個批中有太多數據,那麼咱們能夠經過增長這個做業的並行度來減小延遲。在爲Spark任務添加了10個Kafka分區和10個內核以後,咱們發現延遲大約爲5秒——比20秒要好得多。

使用操做持續時間圖進行故障排除

操做持續時間圖(Operation Duration Chart)顯示了執行各類操做所花費的時間(以毫秒爲單位)。這對於瞭解每一個批處理的時間分佈和故障排除很是有用。讓咱們以Apache Spark社區中的性能改進「Spark-30915:在查找最新批處理ID時避免讀取元數據日誌文件「爲例。
在某次查詢中咱們發現,當壓縮後的元數據日誌很大時,下一批要花費比其餘批更多的時間來處理。

在進行代碼審查以後,咱們發現這是由對壓縮日誌文件的沒必要要讀取形成的並進行了修復。新的操做持續時間圖確認了咱們想法:

將來的開發方向

如上所示,新的Structured Streaming UI將經過提供更有用的流查詢信息幫助開發者更好地監視他們的流做業。做爲早期發佈版本,新的UI仍在開發中,並將在將來的發佈中獲得改進。有幾個將來能夠實現的功能,包括但不限於:

  • 更多的流查詢執行細節:延遲數據,水印,狀態數據指標等等。

  • 在Spark歷史服務器中支持Structured Streaming UI。

  • 對於不尋常的狀況有更明顯的提示:發生延遲等。


近期活動:

9大訓練營免費開營!阿里雲大數據團隊的獨門絕學全在這了

8月24日開始 Spark 實戰訓練營正式開課,戳文末 閱讀原文 便可報名

免費報名連接:https://developer.aliyun.com/learning/trainingcamp/spark/2


阿里巴巴開源大數據技術團隊成立Apache Spark中國技術社區,按期推送精彩案例,技術專家直播,問答區近萬人Spark技術同窗在線提問答疑,只爲營造純粹的Spark氛圍,歡迎釘釘掃碼加入!

對開源大數據和感興趣的同窗能夠加小編微信(下圖二維碼,備註「進羣」)進入技術交流微信羣。

Apache Spark技術交流社區公衆號,微信掃一掃關注


本文分享自微信公衆號 - Apache Spark技術交流社區(E-MapReduce_Spark)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索