【譯】Yarn上常駐Spark-Streaming程序調優

做者從容錯、性能等方面優化了長時間運行在yarn上的spark-Streaming做業git

 

 

對於長時間運行的Spark Streaming做業,一旦提交到YARN羣集便須要永久運行,直到有意中止。任何中斷都會引發嚴重的處理延遲,並可能致使數據丟失或重複。YARN和Apache Spark都不是爲了執行長時間運行的服務而設計的。可是,它們已經成功地知足了近實時數據處理做業的常駐需求。成功並不必定意味着沒有技術挑戰。github

這篇博客總結了在安全的YARN集羣上,運行一個關鍵任務且長時間的Spark Streaming做業的經驗。您將學習如何將Spark Streaming應用程序提交到YARN羣集,以免在值班時候的不眠之夜。shell

 

Fault tolerance

 在YARN集羣模式下,Spark驅動程序與Application Master(應用程序分配的第一個YARN容器)在同一容器中運行。此過程負責從YARN 驅動應用程序和請求資源(Spark執行程序)。重要的是,Application Master消除了在應用程序生命週期中運行的任何其餘進程的須要。即便一個提交Spark Streaming做業的邊緣Hadoop節點失敗,應用程序也不會受到影響。apache

要以集羣模式運行Spark Streaming應用程序,請確保爲spark-submit命令提供如下參數:編程

spark-submit --master yarn --deploy-mode cluster

因爲Spark驅動程序和Application Master共享一個JVM,Spark驅動程序中的任何錯誤都會阻止咱們長期運行的工做。幸運的是,能夠配置從新運行應用程序的最大嘗試次數。設置比默認值2更高的值是合理的(從YARN集羣屬性yarn.resourcemanager.am.max嘗試中導出)。對我來講,4工做至關好,即便失敗的緣由是永久性的,較高的值也可能致使沒必要要的從新啓動。緩存

spark-submit --master yarn --deploy-mode cluster \
    --conf spark.yarn.maxAppAttempts=4

若是應用程序運行數天或數週,而不從新啓動或從新部署在高度使用的羣集上,則可能在幾個小時內耗盡4次嘗試。爲了不這種狀況,嘗試計數器應該在每一個小時都重置。安全

spark-submit --master yarn --deploy-mode cluster \
    --conf spark.yarn.maxAppAttempts=4 \
    --conf spark.yarn.am.attemptFailuresValidityInterval=1h

 

另外一個重要的設置是在應用程序發生故障以前executor失敗的最大數量。默認狀況下是max(2 * num executors,3),很是適合批處理做業,但不適用於長時間運行的做業。該屬性具備相應的有效期間,也應設置。app

spark-submit --master yarn --deploy-mode cluster \
    --conf spark.yarn.maxAppAttempts=4 \
    --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
    --conf spark.yarn.max.executor.failures={8 * num_executors} \
    --conf spark.yarn.executor.failuresValidityInterval=1h

對於長時間運行的做業,您也能夠考慮在放棄做業以前提升任務失敗的最大數量。默認狀況下,任務將重試4次,而後做業失敗。dom

spark-submit --master yarn --deploy-mode cluster \
    --conf spark.yarn.maxAppAttempts=4 \
    --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
    --conf spark.yarn.max.executor.failures={8 * num_executors} \
    --conf spark.yarn.executor.failuresValidityInterval=1h \
    --conf spark.task.maxFailures=8

 

Performance

當Spark Streaming應用程序提交到集羣時,必須定義運行做業的YARN隊列。我強烈建議使用YARN Capacity Scheduler並將長時間運行的做業提交到單獨的隊列。沒有一個單獨的YARN隊列,您的長時間運行的工做早晚將被的大量Hive查詢搶佔。jvm

spark-submit --master yarn --deploy-mode cluster \
    --conf spark.yarn.maxAppAttempts=4 \
    --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
    --conf spark.yarn.max.executor.failures={8 * num_executors} \
    --conf spark.yarn.executor.failuresValidityInterval=1h \
    --conf spark.task.maxFailures=8 \
    --queue realtime_queue

 

Spark Streaming工做的另外一個重要問題是保持處理時間的穩定性和高度可預測性。處理時間應保持在批次持續時間如下以免延誤。我發現Spark的推測執行有不少幫助,特別是在繁忙的羣集中。當啓用推測性執行時,批處理時間更加穩定。只有當Spark操做是冪等時,才能啓用推測模式。

spark-submit --master yarn --deploy-mode cluster \
    --conf spark.yarn.maxAppAttempts=4 \
    --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
    --conf spark.yarn.max.executor.failures={8 * num_executors} \
    --conf spark.yarn.executor.failuresValidityInterval=1h \
    --conf spark.task.maxFailures=8 \
    --queue realtime_queue \
    --conf spark.speculation=true

 

Security

在安全的HDFS羣集上,長時間運行的Spark Streaming做業因爲Kerberos票據到期而失敗。沒有其餘設置,當Spark Streaming做業提交到集羣時,會發布Kerberos票證。當票證到期時Spark Streaming做業不能再從HDFS寫入或讀取數據。

在理論上(基於文檔),應該將Kerberos主體和keytab做爲spark-submit命令傳遞:

spark-submit --master yarn --deploy-mode cluster \
     --conf spark.yarn.maxAppAttempts=4 \
     --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
     --conf spark.yarn.max.executor.failures={8 * num_executors} \
     --conf spark.yarn.executor.failuresValidityInterval=1h \
     --conf spark.task.maxFailures=8 \
     --queue realtime_queue \
     --conf spark.speculation=true \
     --principal user/hostname@domain \
     --keytab /path/to/foo.keytab

 

實際上,因爲幾個錯誤(HDFS-9276SPARK-11182)必須禁用HDFS緩存。若是沒有,Spark將沒法從HDFS上的文件讀取更新的令牌。

spark-submit --master yarn --deploy-mode cluster \
     --conf spark.yarn.maxAppAttempts=4 \
     --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
     --conf spark.yarn.max.executor.failures={8 * num_executors} \
     --conf spark.yarn.executor.failuresValidityInterval=1h \
     --conf spark.task.maxFailures=8 \
     --queue realtime_queue \
     --conf spark.speculation=true \
     --principal user/hostname@domain \
     --keytab /path/to/foo.keytab \
     --conf spark.hadoop.fs.hdfs.impl.disable.cache=true

Mark Grover指出,這些錯誤隻影響在HA模式下配置了NameNodes的HDFS集羣。謝謝,馬克

 

 

Logging

訪問Spark應用程序日誌的最簡單方法是配置Log4j控制檯追加程序,等待應用程序終止並使用yarn logs -applicationId [applicationId]命令。不幸的是終止長時間運行的Spark Streaming做業來訪問日誌是不可行的。

我建議安裝和配置Elastic,Logstash和Kibana(ELK套裝)。ELK的安裝和配置是超出了這篇博客的範圍,但請記住記錄如下上下文字段:

  • YARN application id
  • YARN container hostname
  • Executor id (Spark driver is always 000001, Spark executors start from 000002)
  • YARN attempt (to check how many times Spark driver has been restarted)

Log4j配置使用Logstash特定的appender和佈局定義應該傳遞給spark-submit命令:

spark-submit --master yarn --deploy-mode cluster \
     --conf spark.yarn.maxAppAttempts=4 \
     --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
     --conf spark.yarn.max.executor.failures={8 * num_executors} \
     --conf spark.yarn.executor.failuresValidityInterval=1h \
     --conf spark.task.maxFailures=8 \
     --queue realtime_queue \
     --conf spark.speculation=true \
     --principal user/hostname@domain \
     --keytab /path/to/foo.keytab \
     --conf spark.hadoop.fs.hdfs.impl.disable.cache=true \
     --conf spark.driver.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties \
     --conf spark.executor.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties \
     --files /path/to/log4j.properties

最後,Spark Job的Kibana儀表板可能以下所示:

 

 

Monitoring

長時間運行的工做全天候運行,因此瞭解歷史指標很重要。Spark UI僅在有限數量的批次中保留統計信息,而且在從新啓動後,全部度量標準都消失了。再次,須要外部工具。我建議安裝Graphite用於收集指標和Grafana來創建儀表板。

首先,Spark須要配置爲將指標報告給Graphite,準備metrics.properties文件:

 

*.sink.graphite.class=org.apache.spark.metrics.sink.GraphiteSink
*.sink.graphite.host=[hostname]
*.sink.graphite.port=[port]
*.sink.graphite.prefix=some_meaningful_name

driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource

 

Graceful stop

最後一個難題是如何以優雅的方式中止部署在YARN上的Spark Streaming應用程序。中止(甚至殺死)YARN應用程序的標準方法是使用命令yarn application -kill [applicationId]。這個命令會中止Spark Streaming應用程序,但這可能發生在批處理中。所以,若是該做業是從Kafka讀取數據而後在HDFS上保存處理結果,並最終提交Kafka偏移量,看成業在提交偏移以前中止工做時,您應該預見到HDFS會有重複的數據。

解決優雅關機問題的第一個嘗試是在關閉程序時回調Spark Streaming Context的中止方法。

sys.addShutdownHook {
    streamingContext.stop(stopSparkContext = true, stopGracefully = true)
}

使人失望的是,因爲Spark應用程序幾乎當即被殺死,一個退出回調函數來不及完成已啓動的批處理任務。此外,不能保證JVM會調用shutdown hook。

在撰寫本博客文章時,惟一確認的YARN Spark Streaming應用程序的確切方法是通知應用程序關於計劃關閉,而後以編程方式中止流式傳輸(但不是關閉掛鉤)。命令yarn application -kill 若是通知應用程序在定義的超時後沒有中止,則應該僅用做最後手段。

可使用HDFS上的標記文件(最簡單的方法)或使用驅動程序上公開的簡單Socket / HTTP端點(複雜方式)通知應用程序。

由於我喜歡KISS原理,下面你能夠找到shell腳本僞代碼,用於啓動/中止Spark Streaming應用程序使用標記文件:

start() {
    hdfs dfs -touchz /path/to/marker/my_job_unique_name
    spark-submit ...
}

stop() {
    hdfs dfs -rm /path/to/marker/my_job_unique_name
    force_kill=true
    application_id=$(yarn application -list | grep -oe "application_[0-9]*_[0-9]*"`)
    for i in `seq 1 10`; do
        application_status=$(yarn application -status ${application_id} | grep "State : \(RUNNING\|ACCEPTED\)")
        if [ -n "$application_status" ]; then
            sleep 60s
        else
            force_kill=false
            break
        fi
    done
    $force_kill && yarn application -kill ${application_id}
}

在Spark Streaming應用程序中,後臺線程應該監視標記文件,當文件消失時中止上下文調用

streamingContext.stop(stopSparkContext = true, stopGracefully = true).

 

Summary

 能夠看到,部署在YARN上的關鍵任務Spark Streaming應用程序的配置至關複雜。以上提出的技術,由一些很是聰明的開發人員通過漫長而冗長乏味的迭代學習。最終,部署在高可用的YARN集羣上的長期運行的Spark Streaming應用很是穩定。

 

 

 

 

原文地址:http://mkuthan.github.io/blog/2016/09/30/spark-streaming-on-yarn/

相關文章
相關標籤/搜索