spark 集羣優化

只有滿懷自信的人,能在任何地方都懷有自信,沉浸在生活中,並認識本身的意志。git

前言

最近公司有一個生產的小集羣,專門用於運行spark做業。可是偶爾會由於nn或dn壓力過大而致使做業checkpoint操做失敗進而致使spark 流任務失敗。本篇記錄從應用層面對spark做業進行優化,進而達到優化集羣的做用。github

集羣使用狀況

有數據的目錄以及使用狀況以下:sql

目錄
說明
大小
文件數量
數據數量佔比
數據大小佔比
/user/root/.sparkStaging/applicationIdxxx spark任務配置以及所需jar包 5G 約1k 約20% 約100%
/tmp/checkpoint/xxx/{commits|metadata|offsets|sources} checkpoint文件,其中commits和offsets頻繁變更 2M 約4k 約40% 約0%

對於.sparkStaging目錄,不常常變更,只須要優化其大小便可。apache

對於 checkpoint目錄,頻繁性增刪,從生成周期和保留策略兩方面去考慮。bootstrap

 .sparkStaging目錄優化

對於/user/hadoop/.sparkStaging下文件,是spark任務依賴文件,能夠將jar包上傳到指定目錄下,避免或減小了jar包的重複上傳,進而減小任務的等待時間。api

能夠在spark的配置文件spark-defaults.conf配置以下內容:bash

spark.yarn.archive=hdfs://hdfscluster/user/hadoop/jars
spark.yarn.preserve.staging.files=false

參數說明

Property Name
Default
Meaning
spark.yarn.archive (none)
An archive containing needed Spark jars for distribution to the YARN cache. If set, this configuration replaces spark.yarn.jars and the archive is used in all the application's containers. The archive should contain jar files in its root directory. Like with the previous option, the archive can also be hosted on HDFS to speed up file distribution.
spark.yarn.preserve.staging.files false Set to true to preserve the staged files (Spark jar, app jar, distributed cache files) at the end of the job rather than delete them.

checkpoint優化

首先了解一下 checkpoint文件表明的含義。app

checkpoint文件說明

  • offsets 目錄 - 預先記錄日誌,記錄每一個批次中存在的偏移量。爲了確保給定的批次將始終包含相同的數據,咱們在進行任何處理以前將其寫入此日誌。所以,該日誌中的第N個記錄指示當前正在處理的數據,第N-1個條目指示哪些偏移已持久地提交給sink。async

  • commits 目錄 - 記錄已完成的批次ID的日誌。這用於檢查批處理是否已徹底處理,而且其輸出已提交給接收器,所以無需再次處理。(例如)在從新啓動過程當中使用,以幫助識別接下來要運行的批處理。maven

  • metadata 文件 - 與整個查詢關聯的元數據,只有一個 StreamingQuery 惟一ID

  • sources目錄 - 保存起始offset信息

下面從兩個方面來優化checkpoint。

第一,從觸發checkpoint機制方面考慮

trigger的機制

Trigger是用於指示 StreamingQuery 多久生成一次結果的策略。

Trigger有三個實現類,分別爲:

  • OneTimeTrigger - A Trigger that processes only one batch of data in a streaming query then terminates the query.

  • ProcessingTime - A trigger that runs a query periodically based on the processing time. If interval is 0, the query will run as fast as possible.by default,trigger is ProcessingTime, and interval=0

  • ContinuousTrigger - A Trigger that continuously processes streaming data, asynchronously checkpointing at the specified interval.

能夠爲 ProcessingTime 指定一個時間 或者使用 指定時間的ContinuousTrigger ,固定生成checkpoint的週期,避免checkpoint生成過於頻繁,減輕多任務下小集羣的nn的壓力

 

第二,從checkpoint保留機制考慮。

保留機制

spark.sql.streaming.minBatchesToRetain - 必須保留並使其可恢復的最小批次數,默認爲 100

能夠調小保留的batch的次數,好比調小到 20,這樣 checkpoint 小文件數量總體能夠減小到原來的 20%

checkpoint 參數驗證

主要驗證trigger機制保留機制

驗證trigger機制

未設置trigger效果

未設置trigger前,spark structured streaming 的查詢batch提交的週期截圖以下:

每個batch的query任務的提交是毫無週期規律可尋。

設置trigger代碼

trigger效果

設置trigger代碼後效果截圖以下:

每個batch的query任務的提交是有規律可尋的,即每隔5s提交一次代碼,即trigger設置生效

注意,若是消息不能立刻被消費,消息會有積壓,structured streaming 目前並沒有與spark streaming效果等同的背壓機制,爲防止單批次query查詢的數據源數據量過大,避免程序出現數據傾斜或者沒法挽回的OutOfMemory錯誤,能夠經過 maxOffsetsPerTrigger 參數來設置單個批次容許抓取的最大消息條數。

使用案例以下:

spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "xxx:9092")
    .option("subscribe", "test-name")
    .option("startingOffsets", "earliest")
    .option("maxOffsetsPerTrigger", 1)
    .option("group.id", "2")
    .option("auto.offset.reset", "earliest")
    .load()

驗證保留機制

默認保留機制效果

spark任務提交參數

#!/bin/bash
spark-submit \
--class zd.Example \
--master yarn \
--deploy-mode client \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3,org.apache.kafka:kafka-clients:2.0.0 \
--repositories http://maven.aliyun.com/nexus/content/groups/public/ \
/root/spark-test-1.0-SNAPSHOT.jar

 

以下圖,offsets和commits最終最少各保留100個文件。

修改保留策略

經過修改任務提交參數來進一步修改checkpoint的保留策略。

添加 --conf spark.sql.streaming.minBatchesToRetain=2 ,完整腳本以下:

#!/bin/bash
spark-submit \
--class zd.Example \
--master yarn \
--deploy-mode client \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3,org.apache.kafka:kafka-clients:2.0.0 \
--repositories http://maven.aliyun.com/nexus/content/groups/public/ \
--conf spark.sql.streaming.minBatchesToRetain=2 \
/root/spark-test-1.0-SNAPSHOT.jar

修改後保留策略效果

修改後保留策略截圖以下:

即 checkpoint的保留策略參數設置生效

總結

綜上,能夠經過設置 trigger 來控制每個batch的query提交的時間間隔,能夠經過設置checkpoint文件最少保留batch的大小來減小checkpoint小文件的保留個數。

參照

  1. https://github.com/apache/spark/blob/master/docs/running-on-yarn.md
  2. https://blog.csdn.net/lm709409753/article/details/85250859
  3. https://github.com/apache/spark/blob/v2.4.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
  4. https://github.com/apache/spark/blob/v2.4.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
  5. https://github.com/apache/spark/blob/v2.4.3/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala
  6. https://github.com/apache/spark/blob/v2.4.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala
  7. https://github.com/apache/spark/blob/v2.4.3/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
相關文章
相關標籤/搜索