本文主要了解Spark On YARN部署模式下的內存分配狀況,由於沒有深刻研究Spark的源代碼,因此只能根據日誌去看相關的源代碼,從而瞭解「爲何會這樣,爲何會那樣」。html
按照Spark應用程序中的driver分佈方式不一樣,Spark on YARN有兩種模式: yarn-client
模式、yarn-cluster
模式。java
當在YARN上運行Spark做業,每一個Spark executor做爲一個YARN容器運行。Spark可使得多個Tasks在同一個容器裏面運行。node
下圖是yarn-cluster模式的做業執行圖,圖片來源於網絡:apache
關於Spark On YARN相關的配置參數,請參考Spark配置參數。本文主要討論內存分配狀況,因此只須要關注如下幾個心裏相關的參數:網絡
spark.driver.memory
:默認值512mspark.executor.memory
:默認值512mspark.yarn.am.memory
:默認值512mspark.yarn.executor.memoryOverhead
:值爲executorMemory * 0.07, with minimum of 384
spark.yarn.driver.memoryOverhead
:值爲driverMemory * 0.07, with minimum of 384
spark.yarn.am.memoryOverhead
:值爲AM memory * 0.07, with minimum of 384
注意:併發
--executor-memory/spark.executor.memory
控制 executor 的堆的大小,可是 JVM 自己也會佔用必定的堆空間,好比內部的 String 或者直接 byte buffer,spark.yarn.XXX.memoryOverhead
屬性決定向 YARN 請求的每一個 executor 或dirver或am 的額外堆內存大小,默認值爲 max(384, 0.07 * spark.executor.memory
)另外,由於任務是提交到YARN上運行的,因此YARN中有幾個關鍵參數,參考YARN的內存和CPU配置:app
yarn.app.mapreduce.am.resource.mb
:AM可以申請的最大內存,默認值爲1536MByarn.nodemanager.resource.memory-mb
:nodemanager可以申請的最大內存,默認值爲8192MByarn.scheduler.minimum-allocation-mb
:調度時一個container可以申請的最小資源,默認值爲1024MByarn.scheduler.maximum-allocation-mb
:調度時一個container可以申請的最大資源,默認值爲8192MBSpark集羣測試環境爲:less
注意:YARN集羣部署在Spark集羣之上的,每個worker節點上同時部署了一個NodeManager,而且YARN集羣中的配置以下:eclipse
<property> <name>yarn.nodemanager.resource.memory-mb</name> <value>106496</value> <!-- 104G --> </property> <property> <name>yarn.scheduler.minimum-allocation-mb</name> <value>2048</value> </property> <property> <name>yarn.scheduler.maximum-allocation-mb</name> <value>106496</value> </property> <property> <name>yarn.app.mapreduce.am.resource.mb</name> <value>2048</value> </property>
將spark的日誌基本調爲DEBUG,並將log4j.logger.org.apache.hadoop設置爲WARN建設沒必要要的輸出,修改/etc/spark/conf/log4j.properties:jvm
# Set everything to be logged to the console log4j.rootCategory=DEBUG, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n # Settings to quiet third party logs that are too verbose log4j.logger.org.eclipse.jetty=WARN log4j.logger.org.apache.hadoop=WARN log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
接下來是運行測試程序,以官方自帶的SparkPi例子爲例,下面主要測試client模式,至於cluster模式請參考下面的過程
。運行下面命令:
spark-submit --class org.apache.spark.examples.SparkPi \ --master yarn-client \ --num-executors 4 \ --driver-memory 2g \ --executor-memory 3g \ --executor-cores 4 \ /usr/lib/spark/lib/spark-examples-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar \ 100000
觀察輸出日誌(無關的日誌被略去):
15/06/08 13:57:01 INFO SparkContext: Running Spark version 1.3.0
15/06/08 13:57:02 INFO SecurityManager: Changing view acls to: root
15/06/08 13:57:02 INFO SecurityManager: Changing modify acls to: root
15/06/08 13:57:03 INFO MemoryStore: MemoryStore started with capacity 1060.3 MB
15/06/08 13:57:04 DEBUG YarnClientSchedulerBackend: ClientArguments called with: --arg bj03-bi-pro-hdpnamenn:51568 --num-executors 4 --num-executors 4 --executor-memory 3g --executor-memory 3g --executor-cores 4 --executor-cores 4 --name Spark Pi
15/06/08 13:57:04 DEBUG YarnClientSchedulerBackend: [actor] handled message (24.52531 ms) ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#864850679]
15/06/08 13:57:05 INFO Client: Requesting a new application from cluster with 4 NodeManagers
15/06/08 13:57:05 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (106496 MB per container)
15/06/08 13:57:05 INFO Client: Will allocate AM container, with 896 MB memory including 384 MB overhead
15/06/08 13:57:05 INFO Client: Setting up container launch context for our AM
15/06/08 13:57:07 DEBUG Client: ===============================================================================
15/06/08 13:57:07 DEBUG Client: Yarn AM launch context:
15/06/08 13:57:07 DEBUG Client: user class: N/A
15/06/08 13:57:07 DEBUG Client: env:
15/06/08 13:57:07 DEBUG Client: CLASSPATH -> <CPS>/__spark__.jar<CPS>$HADOOP_CONF_DIR<CPS>$HADOOP_COMMON_HOME/*<CPS>$HADOOP_COMMON_HOME/lib/*<CPS>$HADOOP_HDFS_HOME/*<CPS>$HADOOP_HDFS_HOME/lib/*<CPS>$HADOOP_MAPRED_HOME/*<CPS>$HADOOP_MAPRED_HOME/lib/*<CPS>$HADOOP_YARN_HOME/*<CPS>$HADOOP_YARN_HOME/lib/*<CPS>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*<CPS>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*<CPS>:/usr/lib/spark/lib/spark-assembly.jar::/usr/lib/hadoop/lib/*:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/*:/usr/lib/hive/lib/*:/usr/lib/flume-ng/lib/*:/usr/lib/paquet/lib/*:/usr/lib/avro/lib/*
15/06/08 13:57:07 DEBUG Client: SPARK_DIST_CLASSPATH -> :/usr/lib/spark/lib/spark-assembly.jar::/usr/lib/hadoop/lib/*:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/*:/usr/lib/hive/lib/*:/usr/lib/flume-ng/lib/*:/usr/lib/paquet/lib/*:/usr/lib/avro/lib/*
15/06/08 13:57:07 DEBUG Client: SPARK_YARN_CACHE_FILES_FILE_SIZES -> 97237208
15/06/08 13:57:07 DEBUG Client: SPARK_YARN_STAGING_DIR -> .sparkStaging/application_1433742899916_0001
15/06/08 13:57:07 DEBUG Client: SPARK_YARN_CACHE_FILES_VISIBILITIES -> PRIVATE
15/06/08 13:57:07 DEBUG Client: SPARK_USER -> root
15/06/08 13:57:07 DEBUG Client: SPARK_YARN_MODE -> true
15/06/08 13:57:07 DEBUG Client: SPARK_YARN_CACHE_FILES_TIME_STAMPS -> 1433743027399
15/06/08 13:57:07 DEBUG Client: SPARK_YARN_CACHE_FILES -> hdfs://mycluster:8020/user/root/.sparkStaging/application_1433742899916_0001/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar#__spark__.jar
15/06/08 13:57:07 DEBUG Client: resources:
15/06/08 13:57:07 DEBUG Client: __spark__.jar -> resource { scheme: "hdfs" host: "mycluster" port: 8020 file: "/user/root/.sparkStaging/application_1433742899916_0001/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar" } size: 97237208 timestamp: 1433743027399 type: FILE visibility: PRIVATE
15/06/08 13:57:07 DEBUG Client: command:
15/06/08 13:57:07 DEBUG Client: /bin/java -server -Xmx512m -Djava.io.tmpdir=/tmp '-Dspark.eventLog.enabled=true' '-Dspark.executor.instances=4' '-Dspark.executor.memory=3g' '-Dspark.executor.cores=4' '-Dspark.driver.port=51568' '-Dspark.serializer=org.apache.spark.serializer.KryoSerializer' '-Dspark.driver.appUIAddress=http://bj03-bi-pro-hdpnamenn:4040' '-Dspark.executor.id=<driver>' '-Dspark.kryo.classesToRegister=scala.collection.mutable.BitSet,scala.Tuple2,scala.Tuple1,org.apache.spark.mllib.recommendation.Rating' '-Dspark.driver.maxResultSize=8g' '-Dspark.jars=file:/usr/lib/spark/lib/spark-examples-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar' '-Dspark.driver.memory=2g' '-Dspark.eventLog.dir=hdfs://mycluster:8020/user/spark/applicationHistory' '-Dspark.app.name=Spark Pi' '-Dspark.fileserver.uri=http://X.X.X.X:49172' '-Dspark.tachyonStore.folderName=spark-81ae0186-8325-40f2-867b-65ee7c922357' -Dspark.yarn.app.container.log.dir=<LOG_DIR> org.apache.spark.deploy.yarn.ExecutorLauncher --arg 'bj03-bi-pro-hdpnamenn:51568' --executor-memory 3072m --executor-cores 4 --num-executors 4 1> <LOG_DIR>/stdout 2> <LOG_DIR>/stderr
15/06/08 13:57:07 DEBUG Client: ===============================================================================
從Will allocate AM container, with 896 MB memory including 384 MB overhead
日誌能夠看到,AM佔用了896 MB
內存,除掉384 MB
的overhead內存,實際上只有512 MB
,即spark.yarn.am.memory
的默認值,另外能夠看到YARN集羣有4個NodeManager,每一個container最多有106496 MB內存。
Yarn AM launch context啓動了一個Java進程,設置的JVM內存爲512m
,見/bin/java -server -Xmx512m
。
這裏爲何會取默認值呢?查看打印上面這行日誌的代碼,見org.apache.spark.deploy.yarn.Client:
private def verifyClusterResources(newAppResponse: GetNewApplicationResponse): Unit = { val maxMem = newAppResponse.getMaximumResourceCapability().getMemory() logInfo("Verifying our application has not requested more than the maximum " + s"memory capability of the cluster ($maxMem MB per container)") val executorMem = args.executorMemory + executorMemoryOverhead if (executorMem > maxMem) { throw new IllegalArgumentException(s"Required executor memory (${args.executorMemory}" + s"+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!") } val amMem = args.amMemory + amMemoryOverhead if (amMem > maxMem) { throw new IllegalArgumentException(s"Required AM memory (${args.amMemory}" + s"+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!") } logInfo("Will allocate AM container, with %d MB memory including %d MB overhead".format( amMem, amMemoryOverhead)) }
args.amMemory來自ClientArguments類,這個類中會校驗輸出參數:
private def validateArgs(): Unit = { if (numExecutors <= 0) { throw new IllegalArgumentException( "You must specify at least 1 executor!\n" + getUsageMessage()) } if (executorCores < sparkConf.getInt("spark.task.cpus", 1)) { throw new SparkException("Executor cores must not be less than " + "spark.task.cpus.") } if (isClusterMode) { for (key <- Seq(amMemKey, amMemOverheadKey, amCoresKey)) { if (sparkConf.contains(key)) { println(s"$key is set but does not apply in cluster mode.") } } amMemory = driverMemory amCores = driverCores } else { for (key <- Seq(driverMemOverheadKey, driverCoresKey)) { if (sparkConf.contains(key)) { println(s"$key is set but does not apply in client mode.") } } sparkConf.getOption(amMemKey) .map(Utils.memoryStringToMb) .foreach { mem => amMemory = mem } sparkConf.getOption(amCoresKey) .map(_.toInt) .foreach { cores => amCores = cores } } }
從上面代碼能夠看到當 isClusterMode 爲true時,則args.amMemory值爲driverMemory的值;不然,則從spark.yarn.am.memory
中取,若是沒有設置該屬性,則取默認值512m。isClusterMode 爲true的條件是 userClass 不爲空,def isClusterMode: Boolean = userClass != null
,即輸出參數須要有--class
參數,而從下面日誌能夠看到ClientArguments的輸出參數中並無該參數。
15/06/08 13:57:04 DEBUG YarnClientSchedulerBackend: ClientArguments called with: --arg bj03-bi-pro-hdpnamenn:51568 --num-executors 4 --num-executors 4 --executor-memory 3g --executor-memory 3g --executor-cores 4 --executor-cores 4 --name Spark Pi
故,要想設置AM申請的內存值,要麼使用cluster模式,要麼在client模式中,是有--conf
手動設置spark.yarn.am.memory
屬性,例如:
spark-submit --class org.apache.spark.examples.SparkPi \ --master yarn-client \ --num-executors 4 \ --driver-memory 2g \ --executor-memory 3g \ --executor-cores 4 \ --conf spark.yarn.am.memory=1024m \ /usr/lib/spark/lib/spark-examples-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar \ 100000
打開YARN管理界面,能夠看到:
a. Spark Pi 應用啓動了5個Container,使用了18G內存、5個CPU core
b. YARN爲AM啓動了一個Container,佔用內存爲2048M
c. YARN啓動了4個Container運行任務,每個Container佔用內存爲4096M
爲何會是2G +4G *4=18G
呢?第一個Container只申請了2G內存,是由於咱們的程序只爲AM申請了512m內存,而yarn.scheduler.minimum-allocation-mb
參數決定了最少要申請2G內存。至於其他的Container,咱們設置了executor-memory內存爲3G,爲何每個Container佔用內存爲4096M呢?
爲了找出規律,多測試幾組數據,分別測試並收集executor-memory爲3G、4G、5G、6G時每一個executor對應的Container內存申請狀況:
關於這個問題,我是查看源代碼,根據org.apache.spark.deploy.yarn.ApplicationMaster -> YarnRMClient -> YarnAllocator的類查找路徑找到YarnAllocator中有這樣一段代碼:
// Executor memory in MB. protected val executorMemory = args.executorMemory // Additional memory overhead. protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead", math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)) // Number of cores per executor. protected val executorCores = args.executorCores // Resource capability requested for each executors private val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores)
由於沒有具體的去看YARN的源代碼,因此這裏猜想Container的大小是根據executorMemory + memoryOverhead
計算出來的,大概的規則是每個Container的大小必須爲yarn.scheduler.minimum-allocation-mb
值的整數倍,當executor-memory=3g
時,executorMemory + memoryOverhead
爲3G+384M=3456M,須要申請的Container大小爲yarn.scheduler.minimum-allocation-mb
* 2 =4096m=4G,其餘依此類推。
注意:
- Yarn always rounds up memory requirement to multiples of
yarn.scheduler.minimum-allocation-mb
, which by default is 1024 or 1GB.- Spark adds an
overhead
toSPARK_EXECUTOR_MEMORY/SPARK_DRIVER_MEMORY
before asking Yarn for the amount.
另外,須要注意memoryOverhead的計算方法,當executorMemory的值很大時,memoryOverhead的值相應會變大,這個時候就不是384m了,相應的Container申請的內存值也變大了,例如:當executorMemory設置爲90G時,memoryOverhead值爲math.max(0.07 * 90G, 384m)=6.3G
,其對應的Container申請的內存爲98G。
回頭看看給AM對應的Container分配2G內存緣由,512+384=896,小於2G,故分配2G,你能夠在設置spark.yarn.am.memory
的值以後再來觀察。
打開Spark的管理界面 http://ip:4040 ,能夠看到driver和Executor中內存的佔用狀況:
從上圖能夠看到Executor佔用了1566.7 MB內存,這是怎樣計算出來的?參考Spark on Yarn: Where Have All the Memory Gone?這篇文章,totalExecutorMemory的計算方式爲:
//yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala val MEMORY_OVERHEAD_FACTOR = 0.07 val MEMORY_OVERHEAD_MIN = 384 //yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead", math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)) ...... val totalExecutorMemory = executorMemory + memoryOverhead numPendingAllocate.addAndGet(missing) logInfo(s"Will allocate $missing executor containers, each with $totalExecutorMemory MB " + s"memory including $memoryOverhead MB overhead")
這裏咱們給executor-memory設置的3G內存,memoryOverhead的值爲math.max(0.07 * 3072, 384)=384
,其最大可用內存經過下面代碼來計算:
//core/src/main/scala/org/apache/spark/storage/BlockManager.scala /** Return the total amount of storage memory available. */ private def getMaxMemory(conf: SparkConf): Long = { val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6) val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9) (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong }
即,對於executor-memory設置3G時,executor內存佔用大約爲 3072m * 0.6 * 0.9 = 1658.88m,注意:其實是應該乘以Runtime.getRuntime.maxMemory
的值,該值小於3072m。
上圖中driver佔用了1060.3 MB,此時driver-memory的值是位2G,故driver中存儲內存佔用爲:2048m * 0.6 * 0.9 =1105.92m,注意:其實是應該乘以Runtime.getRuntime.maxMemory
的值,該值小於2048m。
這時候,查看worker節點CoarseGrainedExecutorBackend進程啓動腳本:
$ jps 46841 Worker 21894 CoarseGrainedExecutorBackend 9345 21816 ExecutorLauncher 43369 24300 NodeManager 38012 JournalNode 36929 QuorumPeerMain 22909 Jps $ ps -ef|grep 21894 nobody 21894 21892 99 17:28 ? 00:04:49 /usr/java/jdk1.7.0_71/bin/java -server -XX:OnOutOfMemoryError=kill %p -Xms3072m -Xmx3072m -Djava.io.tmpdir=/data/yarn/local/usercache/root/appcache/application_1433742899916_0069/container_1433742899916_0069_01_000003/tmp -Dspark.driver.port=60235 -Dspark.yarn.app.container.log.dir=/data/yarn/logs/application_1433742899916_0069/container_1433742899916_0069_01_000003 org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url akka.tcp://sparkDriver@bj03-bi-pro-hdpnamenn:60235/user/CoarseGrainedScheduler --executor-id 2 --hostname X.X.X.X --cores 4 --app-id application_1433742899916_0069 --user-class-path file:/data/yarn/local/usercache/root/appcache/application_1433742899916_0069/container_1433742899916_0069_01_000003/__app__.jar
能夠看到每一個CoarseGrainedExecutorBackend進程分配的內存爲3072m,若是咱們想查看每一個executor的jvm運行狀況,能夠開啓jmx。在/etc/spark/conf/spark-defaults.conf中添加下面一行代碼:
spark.executor.extraJavaOptions -Dcom.sun.management.jmxremote.port=1099 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false
而後,經過jconsole監控jvm堆內存運行狀況,這樣方便調試內存大小。
由上可知,在client模式下,AM對應的Container內存由spark.yarn.am.memory
加上spark.yarn.am.memoryOverhead
來肯定,executor加上spark.yarn.executor.memoryOverhead
的值以後肯定對應Container須要申請的內存大小,driver和executor的內存加上spark.yarn.driver.memoryOverhead
或spark.yarn.executor.memoryOverhead
的值以後再乘以0.54肯定storage memory內存大小。在YARN中,Container申請的內存大小必須爲yarn.scheduler.minimum-allocation-mb
的整數倍。
下面這張圖展現了Spark on YARN 內存結構,圖片來自How-to: Tune Your Apache Spark Jobs (Part 2):
至於cluster模式下的分析,請參考上面的過程。但願這篇文章對你有所幫助!