Spark On YARN內存分配

本文主要了解Spark On YARN部署模式下的內存分配狀況,由於沒有深刻研究Spark的源代碼,因此只能根據日誌去看相關的源代碼,從而瞭解「爲何會這樣,爲何會那樣」。html

說明

按照Spark應用程序中的driver分佈方式不一樣,Spark on YARN有兩種模式: yarn-client模式、yarn-cluster模式。java

當在YARN上運行Spark做業,每一個Spark executor做爲一個YARN容器運行。Spark可使得多個Tasks在同一個容器裏面運行。node

下圖是yarn-cluster模式的做業執行圖,圖片來源於網絡:apache

關於Spark On YARN相關的配置參數,請參考Spark配置參數。本文主要討論內存分配狀況,因此只須要關注如下幾個心裏相關的參數:網絡

  • spark.driver.memory:默認值512m
  • spark.executor.memory:默認值512m
  • spark.yarn.am.memory:默認值512m
  • spark.yarn.executor.memoryOverhead:值爲executorMemory * 0.07, with minimum of 384
  • spark.yarn.driver.memoryOverhead:值爲driverMemory * 0.07, with minimum of 384
  • spark.yarn.am.memoryOverhead:值爲AM memory * 0.07, with minimum of 384

注意:併發

  • --executor-memory/spark.executor.memory 控制 executor 的堆的大小,可是 JVM 自己也會佔用必定的堆空間,好比內部的 String 或者直接 byte buffer,spark.yarn.XXX.memoryOverhead屬性決定向 YARN 請求的每一個 executor 或dirver或am 的額外堆內存大小,默認值爲 max(384, 0.07 * spark.executor.memory)
  • 在 executor 執行的時候配置過大的 memory 常常會致使過長的GC延時,64G是推薦的一個 executor 內存大小的上限。
  • HDFS client 在大量併發線程時存在性能問題。大概的估計是每一個 executor 中最多5個並行的 task 就能夠佔滿寫入帶寬。

另外,由於任務是提交到YARN上運行的,因此YARN中有幾個關鍵參數,參考YARN的內存和CPU配置app

  • yarn.app.mapreduce.am.resource.mb:AM可以申請的最大內存,默認值爲1536MB
  • yarn.nodemanager.resource.memory-mb:nodemanager可以申請的最大內存,默認值爲8192MB
  • yarn.scheduler.minimum-allocation-mb:調度時一個container可以申請的最小資源,默認值爲1024MB
  • yarn.scheduler.maximum-allocation-mb:調度時一個container可以申請的最大資源,默認值爲8192MB

測試

Spark集羣測試環境爲:less

  • master:64G內存,16核cpu
  • worker:128G內存,32核cpu
  • worker:128G內存,32核cpu
  • worker:128G內存,32核cpu
  • worker:128G內存,32核cpu

注意:YARN集羣部署在Spark集羣之上的,每個worker節點上同時部署了一個NodeManager,而且YARN集羣中的配置以下:eclipse

<property> <name>yarn.nodemanager.resource.memory-mb</name> <value>106496</value> <!-- 104G --> </property> <property> <name>yarn.scheduler.minimum-allocation-mb</name> <value>2048</value> </property> <property> <name>yarn.scheduler.maximum-allocation-mb</name> <value>106496</value> </property> <property> <name>yarn.app.mapreduce.am.resource.mb</name> <value>2048</value> </property> 

將spark的日誌基本調爲DEBUG,並將log4j.logger.org.apache.hadoop設置爲WARN建設沒必要要的輸出,修改/etc/spark/conf/log4j.properties:jvm

# Set everything to be logged to the console log4j.rootCategory=DEBUG, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n # Settings to quiet third party logs that are too verbose log4j.logger.org.eclipse.jetty=WARN log4j.logger.org.apache.hadoop=WARN log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO 

接下來是運行測試程序,以官方自帶的SparkPi例子爲例,下面主要測試client模式,至於cluster模式請參考下面的過程。運行下面命令:

spark-submit --class org.apache.spark.examples.SparkPi \ --master yarn-client \ --num-executors 4 \ --driver-memory 2g \ --executor-memory 3g \ --executor-cores 4 \ /usr/lib/spark/lib/spark-examples-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar \ 100000 

觀察輸出日誌(無關的日誌被略去):

15/06/08 13:57:01 INFO SparkContext: Running Spark version 1.3.0
15/06/08 13:57:02 INFO SecurityManager: Changing view acls to: root
15/06/08 13:57:02 INFO SecurityManager: Changing modify acls to: root

15/06/08 13:57:03 INFO MemoryStore: MemoryStore started with capacity 1060.3 MB

15/06/08 13:57:04 DEBUG YarnClientSchedulerBackend: ClientArguments called with: --arg bj03-bi-pro-hdpnamenn:51568 --num-executors 4 --num-executors 4 --executor-memory 3g --executor-memory 3g --executor-cores 4 --executor-cores 4 --name Spark Pi
15/06/08 13:57:04 DEBUG YarnClientSchedulerBackend: [actor] handled message (24.52531 ms) ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#864850679]
15/06/08 13:57:05 INFO Client: Requesting a new application from cluster with 4 NodeManagers
15/06/08 13:57:05 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (106496 MB per container)
15/06/08 13:57:05 INFO Client: Will allocate AM container, with 896 MB memory including 384 MB overhead
15/06/08 13:57:05 INFO Client: Setting up container launch context for our AM

15/06/08 13:57:07 DEBUG Client: ===============================================================================
15/06/08 13:57:07 DEBUG Client: Yarn AM launch context:
15/06/08 13:57:07 DEBUG Client:     user class: N/A
15/06/08 13:57:07 DEBUG Client:     env:
15/06/08 13:57:07 DEBUG Client:         CLASSPATH -> <CPS>/__spark__.jar<CPS>$HADOOP_CONF_DIR<CPS>$HADOOP_COMMON_HOME/*<CPS>$HADOOP_COMMON_HOME/lib/*<CPS>$HADOOP_HDFS_HOME/*<CPS>$HADOOP_HDFS_HOME/lib/*<CPS>$HADOOP_MAPRED_HOME/*<CPS>$HADOOP_MAPRED_HOME/lib/*<CPS>$HADOOP_YARN_HOME/*<CPS>$HADOOP_YARN_HOME/lib/*<CPS>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*<CPS>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*<CPS>:/usr/lib/spark/lib/spark-assembly.jar::/usr/lib/hadoop/lib/*:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/*:/usr/lib/hive/lib/*:/usr/lib/flume-ng/lib/*:/usr/lib/paquet/lib/*:/usr/lib/avro/lib/*
15/06/08 13:57:07 DEBUG Client:         SPARK_DIST_CLASSPATH -> :/usr/lib/spark/lib/spark-assembly.jar::/usr/lib/hadoop/lib/*:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/*:/usr/lib/hive/lib/*:/usr/lib/flume-ng/lib/*:/usr/lib/paquet/lib/*:/usr/lib/avro/lib/*
15/06/08 13:57:07 DEBUG Client:         SPARK_YARN_CACHE_FILES_FILE_SIZES -> 97237208
15/06/08 13:57:07 DEBUG Client:         SPARK_YARN_STAGING_DIR -> .sparkStaging/application_1433742899916_0001
15/06/08 13:57:07 DEBUG Client:         SPARK_YARN_CACHE_FILES_VISIBILITIES -> PRIVATE
15/06/08 13:57:07 DEBUG Client:         SPARK_USER -> root
15/06/08 13:57:07 DEBUG Client:         SPARK_YARN_MODE -> true
15/06/08 13:57:07 DEBUG Client:         SPARK_YARN_CACHE_FILES_TIME_STAMPS -> 1433743027399
15/06/08 13:57:07 DEBUG Client:         SPARK_YARN_CACHE_FILES -> hdfs://mycluster:8020/user/root/.sparkStaging/application_1433742899916_0001/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar#__spark__.jar
15/06/08 13:57:07 DEBUG Client:     resources:
15/06/08 13:57:07 DEBUG Client:         __spark__.jar -> resource { scheme: "hdfs" host: "mycluster" port: 8020 file: "/user/root/.sparkStaging/application_1433742899916_0001/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar" } size: 97237208 timestamp: 1433743027399 type: FILE visibility: PRIVATE
15/06/08 13:57:07 DEBUG Client:     command:
15/06/08 13:57:07 DEBUG Client:         /bin/java -server -Xmx512m -Djava.io.tmpdir=/tmp '-Dspark.eventLog.enabled=true' '-Dspark.executor.instances=4' '-Dspark.executor.memory=3g' '-Dspark.executor.cores=4' '-Dspark.driver.port=51568' '-Dspark.serializer=org.apache.spark.serializer.KryoSerializer' '-Dspark.driver.appUIAddress=http://bj03-bi-pro-hdpnamenn:4040' '-Dspark.executor.id=<driver>' '-Dspark.kryo.classesToRegister=scala.collection.mutable.BitSet,scala.Tuple2,scala.Tuple1,org.apache.spark.mllib.recommendation.Rating' '-Dspark.driver.maxResultSize=8g' '-Dspark.jars=file:/usr/lib/spark/lib/spark-examples-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar' '-Dspark.driver.memory=2g' '-Dspark.eventLog.dir=hdfs://mycluster:8020/user/spark/applicationHistory' '-Dspark.app.name=Spark Pi' '-Dspark.fileserver.uri=http://X.X.X.X:49172' '-Dspark.tachyonStore.folderName=spark-81ae0186-8325-40f2-867b-65ee7c922357' -Dspark.yarn.app.container.log.dir=<LOG_DIR> org.apache.spark.deploy.yarn.ExecutorLauncher --arg 'bj03-bi-pro-hdpnamenn:51568' --executor-memory 3072m --executor-cores 4 --num-executors  4 1> <LOG_DIR>/stdout 2> <LOG_DIR>/stderr
15/06/08 13:57:07 DEBUG Client: ===============================================================================

Will allocate AM container, with 896 MB memory including 384 MB overhead日誌能夠看到,AM佔用了896 MB內存,除掉384 MB的overhead內存,實際上只有512 MB,即spark.yarn.am.memory的默認值,另外能夠看到YARN集羣有4個NodeManager,每一個container最多有106496 MB內存。

Yarn AM launch context啓動了一個Java進程,設置的JVM內存爲512m,見/bin/java -server -Xmx512m

這裏爲何會取默認值呢?查看打印上面這行日誌的代碼,見org.apache.spark.deploy.yarn.Client:

private def verifyClusterResources(newAppResponse: GetNewApplicationResponse): Unit = { val maxMem = newAppResponse.getMaximumResourceCapability().getMemory() logInfo("Verifying our application has not requested more than the maximum " + s"memory capability of the cluster ($maxMem MB per container)") val executorMem = args.executorMemory + executorMemoryOverhead if (executorMem > maxMem) { throw new IllegalArgumentException(s"Required executor memory (${args.executorMemory}" + s"+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!") } val amMem = args.amMemory + amMemoryOverhead if (amMem > maxMem) { throw new IllegalArgumentException(s"Required AM memory (${args.amMemory}" + s"+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!") } logInfo("Will allocate AM container, with %d MB memory including %d MB overhead".format( amMem, amMemoryOverhead)) } 

args.amMemory來自ClientArguments類,這個類中會校驗輸出參數:

private def validateArgs(): Unit = { if (numExecutors <= 0) { throw new IllegalArgumentException( "You must specify at least 1 executor!\n" + getUsageMessage()) } if (executorCores < sparkConf.getInt("spark.task.cpus", 1)) { throw new SparkException("Executor cores must not be less than " + "spark.task.cpus.") } if (isClusterMode) { for (key <- Seq(amMemKey, amMemOverheadKey, amCoresKey)) { if (sparkConf.contains(key)) { println(s"$key is set but does not apply in cluster mode.") } } amMemory = driverMemory amCores = driverCores } else { for (key <- Seq(driverMemOverheadKey, driverCoresKey)) { if (sparkConf.contains(key)) { println(s"$key is set but does not apply in client mode.") } } sparkConf.getOption(amMemKey) .map(Utils.memoryStringToMb) .foreach { mem => amMemory = mem } sparkConf.getOption(amCoresKey) .map(_.toInt) .foreach { cores => amCores = cores } } } 

從上面代碼能夠看到當 isClusterMode 爲true時,則args.amMemory值爲driverMemory的值;不然,則從spark.yarn.am.memory中取,若是沒有設置該屬性,則取默認值512m。isClusterMode 爲true的條件是 userClass 不爲空,def isClusterMode: Boolean = userClass != null,即輸出參數須要有--class參數,而從下面日誌能夠看到ClientArguments的輸出參數中並無該參數。

15/06/08 13:57:04 DEBUG YarnClientSchedulerBackend: ClientArguments called with: --arg bj03-bi-pro-hdpnamenn:51568 --num-executors 4 --num-executors 4 --executor-memory 3g --executor-memory 3g --executor-cores 4 --executor-cores 4 --name Spark Pi

故,要想設置AM申請的內存值,要麼使用cluster模式,要麼在client模式中,是有--conf手動設置spark.yarn.am.memory屬性,例如:

spark-submit --class org.apache.spark.examples.SparkPi \ --master yarn-client \ --num-executors 4 \ --driver-memory 2g \ --executor-memory 3g \ --executor-cores 4 \ --conf spark.yarn.am.memory=1024m \ /usr/lib/spark/lib/spark-examples-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar \ 100000 

打開YARN管理界面,能夠看到:

a. Spark Pi 應用啓動了5個Container,使用了18G內存、5個CPU core

b. YARN爲AM啓動了一個Container,佔用內存爲2048M

c. YARN啓動了4個Container運行任務,每個Container佔用內存爲4096M

爲何會是2G +4G *4=18G呢?第一個Container只申請了2G內存,是由於咱們的程序只爲AM申請了512m內存,而yarn.scheduler.minimum-allocation-mb參數決定了最少要申請2G內存。至於其他的Container,咱們設置了executor-memory內存爲3G,爲何每個Container佔用內存爲4096M呢?

爲了找出規律,多測試幾組數據,分別測試並收集executor-memory爲3G、4G、5G、6G時每一個executor對應的Container內存申請狀況:

  • executor-memory=3g:2G+4G * 4=18G
  • executor-memory=4g:2G+6G * 4=26G
  • executor-memory=5g:2G+6G * 4=26G
  • executor-memory=6g:2G+8G * 4=34G

關於這個問題,我是查看源代碼,根據org.apache.spark.deploy.yarn.ApplicationMaster -> YarnRMClient -> YarnAllocator的類查找路徑找到YarnAllocator中有這樣一段代碼:

// Executor memory in MB. protected val executorMemory = args.executorMemory // Additional memory overhead. protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead", math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)) // Number of cores per executor. protected val executorCores = args.executorCores // Resource capability requested for each executors private val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores) 

由於沒有具體的去看YARN的源代碼,因此這裏猜想Container的大小是根據executorMemory + memoryOverhead計算出來的,大概的規則是每個Container的大小必須爲yarn.scheduler.minimum-allocation-mb值的整數倍,當executor-memory=3g時,executorMemory + memoryOverhead爲3G+384M=3456M,須要申請的Container大小爲yarn.scheduler.minimum-allocation-mb * 2 =4096m=4G,其餘依此類推。

注意:

  • Yarn always rounds up memory requirement to multiples of yarn.scheduler.minimum-allocation-mb, which by default is 1024 or 1GB.
  • Spark adds an overhead to SPARK_EXECUTOR_MEMORY/SPARK_DRIVER_MEMORY before asking Yarn for the amount.

另外,須要注意memoryOverhead的計算方法,當executorMemory的值很大時,memoryOverhead的值相應會變大,這個時候就不是384m了,相應的Container申請的內存值也變大了,例如:當executorMemory設置爲90G時,memoryOverhead值爲math.max(0.07 * 90G, 384m)=6.3G,其對應的Container申請的內存爲98G。

回頭看看給AM對應的Container分配2G內存緣由,512+384=896,小於2G,故分配2G,你能夠在設置spark.yarn.am.memory的值以後再來觀察。

打開Spark的管理界面 http://ip:4040 ,能夠看到driver和Executor中內存的佔用狀況:

從上圖能夠看到Executor佔用了1566.7 MB內存,這是怎樣計算出來的?參考Spark on Yarn: Where Have All the Memory Gone?這篇文章,totalExecutorMemory的計算方式爲:

//yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala val MEMORY_OVERHEAD_FACTOR = 0.07 val MEMORY_OVERHEAD_MIN = 384 //yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead", math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)) ...... val totalExecutorMemory = executorMemory + memoryOverhead numPendingAllocate.addAndGet(missing) logInfo(s"Will allocate $missing executor containers, each with $totalExecutorMemory MB " + s"memory including $memoryOverhead MB overhead") 

這裏咱們給executor-memory設置的3G內存,memoryOverhead的值爲math.max(0.07 * 3072, 384)=384,其最大可用內存經過下面代碼來計算:

//core/src/main/scala/org/apache/spark/storage/BlockManager.scala /** Return the total amount of storage memory available. */ private def getMaxMemory(conf: SparkConf): Long = { val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6) val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9) (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong } 

即,對於executor-memory設置3G時,executor內存佔用大約爲 3072m * 0.6 * 0.9 = 1658.88m,注意:其實是應該乘以Runtime.getRuntime.maxMemory的值,該值小於3072m。

上圖中driver佔用了1060.3 MB,此時driver-memory的值是位2G,故driver中存儲內存佔用爲:2048m * 0.6 * 0.9 =1105.92m,注意:其實是應該乘以Runtime.getRuntime.maxMemory的值,該值小於2048m。

這時候,查看worker節點CoarseGrainedExecutorBackend進程啓動腳本:

$ jps 46841 Worker 21894 CoarseGrainedExecutorBackend 9345 21816 ExecutorLauncher 43369 24300 NodeManager 38012 JournalNode 36929 QuorumPeerMain 22909 Jps $ ps -ef|grep 21894 nobody 21894 21892 99 17:28 ? 00:04:49 /usr/java/jdk1.7.0_71/bin/java -server -XX:OnOutOfMemoryError=kill %p -Xms3072m -Xmx3072m -Djava.io.tmpdir=/data/yarn/local/usercache/root/appcache/application_1433742899916_0069/container_1433742899916_0069_01_000003/tmp -Dspark.driver.port=60235 -Dspark.yarn.app.container.log.dir=/data/yarn/logs/application_1433742899916_0069/container_1433742899916_0069_01_000003 org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url akka.tcp://sparkDriver@bj03-bi-pro-hdpnamenn:60235/user/CoarseGrainedScheduler --executor-id 2 --hostname X.X.X.X --cores 4 --app-id application_1433742899916_0069 --user-class-path file:/data/yarn/local/usercache/root/appcache/application_1433742899916_0069/container_1433742899916_0069_01_000003/__app__.jar 

能夠看到每一個CoarseGrainedExecutorBackend進程分配的內存爲3072m,若是咱們想查看每一個executor的jvm運行狀況,能夠開啓jmx。在/etc/spark/conf/spark-defaults.conf中添加下面一行代碼:

spark.executor.extraJavaOptions -Dcom.sun.management.jmxremote.port=1099 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false 

而後,經過jconsole監控jvm堆內存運行狀況,這樣方便調試內存大小。

總結

由上可知,在client模式下,AM對應的Container內存由spark.yarn.am.memory加上spark.yarn.am.memoryOverhead來肯定,executor加上spark.yarn.executor.memoryOverhead的值以後肯定對應Container須要申請的內存大小,driver和executor的內存加上spark.yarn.driver.memoryOverheadspark.yarn.executor.memoryOverhead的值以後再乘以0.54肯定storage memory內存大小。在YARN中,Container申請的內存大小必須爲yarn.scheduler.minimum-allocation-mb的整數倍。

下面這張圖展現了Spark on YARN 內存結構,圖片來自How-to: Tune Your Apache Spark Jobs (Part 2)

至於cluster模式下的分析,請參考上面的過程。但願這篇文章對你有所幫助!

相關文章
相關標籤/搜索