本文爲阿里巴巴技術專家餘根茂在社區發的一篇文章。php
Structured Streaming 最初是在 Apache Spark 2.0 中引入的,它已被證實是構建分佈式流處理應用程序的最佳平臺。SQL/Dataset/DataFrame API 和 Spark 的內置函數的統一使得開發人員能夠輕鬆實現複雜的需求,好比支持流聚合、流-流 Join 和窗口。自從 Structured Streaming 發佈以來,社區的開發人員常常要求須要更好的方法來管理他們的流做業,就像咱們在 Spark Streaming 中所作的那樣。爲此,Apache Spark 3.0 爲 Structured Streaming 開發了一套全新的 UI。html
新的 Structured Streaming UI 經過有用的信息和統計信息提供了一種簡單的方法來監控全部流做業,從而使開發調試期間的故障排除變得更容易,在生產環境下經過實時度量更好的理解咱們的做業瓶頸。新的 UI 提供了兩組統計信息:java
- 流查詢做業的聚合信息;
- 流查詢的詳細統計信息,包括輸入速率(Input Rate)、處理速率(Process Rate)、輸入行數(Input Rows)、批處理持續時間(Batch Duration,)、操做持續時間等(Operation Duration)。
文章目錄bootstrap
流查詢做業的聚合信息
當開發人員提交一個流 SQL 查詢時,這個做業的信息將在 Structured Streaming 選項卡中顯示,其中包括活動的流查詢和已完成的流查詢。流查詢的一些基本信息將在結果表中列出,包括查詢名稱、狀態、ID、運行 ID、提交時間、查詢持續時間、最後一個批次的 ID 以及聚合信息,如平均輸入速率和平均處理速率。流查詢的狀態有三種:運行(RUNNING),完成(FINISHED)以及失敗(FAILED)。全部完成的和失敗的查詢都在已完成的流查詢列表中顯示。表格中的錯誤列(Error)顯示失敗查詢的異常詳細信息。具體以下:服務器
咱們能夠經過單擊表格中 Run ID 那列連接查看流查詢的詳細統計信息。微信
詳細統計信息
統計信息頁面顯示了包括輸入/處理速率、延遲和詳細操做持續時間在內的指標,這對於洞察流查詢的狀態很是有用,使咱們可以輕鬆地調試流做業運行過程當中的異常狀況。頁面以下所示:dom
上圖包含如下的監控信息:分佈式
- Input Rate: 全部數據源數據流入的聚合以後速度
- Process Rate: Spark 處理全部數據源的處理速度,也是聚合後的結果。
- Batch Duration: 每一個批次處理時間。
- Operation Duration: 執行各類操做所花費的時間,以毫秒爲單位。
使用新的 UI 進行故障排除
在這一小節,讓咱們來看看如何使用 Structured Streaming 新的 UI 來進行異常排除。咱們的測試代碼以下:函數
import java.util.UUID val bootstrapServers = ... val topics = ... val checkpointLocation = "/tmp/temporary-" + UUID.randomUUID.toString val lines = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option("subscribe", topics) .load() .selectExpr("CAST(value AS STRING)") .as[String] val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count() val query = wordCounts.writeStream .outputMode("complete") .format("console") .option("checkpointLocation", checkpointLocation) .start()
因爲處理能力不足而致使延遲增長
在第一種狀況下,咱們運行查詢來儘快處理 Apache Kafka 中讀取的數據。在每批中,流做業將處理 Kafka 中全部可用的數據。若是咱們的資源不足以快速處理當前批次的數據,那麼延遲將迅速增長。最直觀的判斷是輸入行和批處理持續時間會呈線性增加。處理速率(Process Rate)提示流做業最多隻能處理大約8,000條記錄/秒。可是當前的輸入速率大約是每秒 20,000 條記錄。咱們能夠爲流做業提供更多的執行資源,或者添加足夠的分區來處理這些數據。oop
處理時間比較穩定但延遲很高
這種狀況相比第一種狀況是處理延遲沒有持續增長,具體以下所示:
咱們發如今相同的輸入速率(Input Rate)下,處理速率(Process Rate)能夠保持穩定。這意味着做業的處理能力足以處理輸入數據。然而,每批處理的進程持續時間(即延遲)仍然高達20秒。高延遲的主要緣由是每一個批處理中有太多數據須要處理。一般咱們能夠經過增長做業的並行性來減小延遲。在爲 Spark 任務添加了10個Kafka分區和10個核心以後,咱們發現延遲大約爲5秒——比20秒要好得多。
使用 Operation Duration 圖來進行異常診斷
操做持續時間(Operation Duration)圖以毫秒爲單位顯示執行各類操做所花費的時間。這對於瞭解每一個批次的時間分佈並簡化故障排除頗有用。 讓咱們以Apache Spark 社區中的性能改進 SPARK-30915 爲例進行說明。
在 SPARK-30915 工做以前,當壓縮後的元數據日誌變得很大時,壓縮後的下一批處理要比其餘批處理花費更多的時間。
通過對代碼進行分析以後,發現並修復了沒必要要的讀取壓縮日誌文件的問題,也就是 SPARK-30915 解決的,下圖的運行時間確認了咱們預期的效果:
將來工做
經過上面三個案例,新的 Structured Streaming UI 將幫助開發人員經過更加有用的流查詢信息來更好地監視流做業。做爲早期發佈版本,新的 UI 仍在開發中,並將在之後的版本中進行改進,包括但不限於如下功能:
- 更多流查詢執行細節:延遲數據(late data),水印(watermark),狀態數據指標(state data metrics)等等。
- Spark 歷史服務器中支持 Structured Streaming UI。
- 針對異常狀況的更明顯的提示:好比延遲發生等。
本文翻譯自:A look at the new Structured Streaming UI in Apache Spark™ 3.0
本博客文章除特別聲明,所有都是原創!轉載本文請加上:轉載自過往記憶(https://www.iteblog.com/)
本文連接: 【如何使用 Spark 3.0 中新加的 Structured Streaming UI 來進行異常分析】(https://www.iteblog.com/archives/9844.html)