在Spark0.6.0版本開始支持YARN模式,隨後的版本在逐漸地完善。node
確保HADOOP_CONF_DIR或YARN_CONF_DIR屬性的值已經指向了Hadoop集羣的配置文件。Spark一般使用這些配置信息來向HDFS寫入數據和鏈接到YARN資源管理器。這個目錄下全部的文件將會被分發到YARN集羣中,因此全部應用使用的容器都使用一樣的配置。若是Java的系統屬性或YARN沒有管理的環境變量等配置,它們應該在Spark 的應用配置項中配置。shell
在YARN上啓動Spark有兩種部署模式。在Cluster模式中,Spark的driver程序運行在被YARN管理的集羣中的任何一個master進程中,而且client初始化應用後能夠退出。在Client模式中,driver程序運行在client進程中,而且這個應用程序的master只能被用來從YARN上請求資源。express
和Spark Standalone和Mesos模式不一樣的是,master的地址被指定在--master參數中,在YARN模式中,ResourceManager的地址能夠在Hadoop的配置文件中找到。這樣,--master的的參數是yarn。apache
在cluster模式中啓動Spark應用程序:緩存
$ ./bin/spark-submit --class path.to.your.Class --master yarn --deploy-mode cluster [options] <app jar> [app options]併發
舉例:app
$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \dom
--master yarn \函數
--deploy-mode cluster \oop
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 1 \
--queue thequeue \
lib/spark-examples*.jar \
10
上面的應用例子將會啓動一個YARN client程序,它將會啓動默認的應用Master。而SparkPi將會做爲應用Master的一個子線程運行。client將會週期性地輪詢應用Master來達到轉態的更新並把它們顯示在console終端。一旦你的應用程序運行完畢,client將會退出。
在client模式中啓動Spark應用和cluster模式同樣,只是將cluster替換爲client。以下所示:
$ ./bin/spark-shell --master yarn --deploy-mode client
在cluster模式中,driver程序和client在不一樣的機器上,因此只對於本機的可行的SparkContext.addJar將會失效。爲了使client繼續能使用SparkContext.addJar,能夠在建立命令時給--jars選項賦值。
$ ./bin/spark-submit --class my.main.Class \
--master yarn \
--deploy-mode cluster \
--jars my-other-jar.jar,my-other-other-jar.jar
my-main-jar.jar
app_arg1 app_arg2
在YARN上運行Spark要求一個支持YARN的一個二進制發佈包。你能夠在官網上下載,也能夠本身編譯一個。
Spark on YARN上的許多配置和其餘模式基本上同樣。
在YARN中,executor和應用master運行在「containers」(容器)中。應用程序運行完畢後,YARN提供了兩種存放容器日誌的方式。若是日誌聚合服務被開啓的話(經過yarn.log-aggregation-enable來配置),容器日誌將會被拷貝到HDFS中而且刪除本機上的日誌文件。這些日誌文件使用yarn logs命令能夠在任何一臺集羣中的機器看到。以下:
yarn logs -applicationId <app ID>
上面的命令將會打印出應用程序申請到的全部容器日誌文件的內容。你也能夠經過HDFS shell或API來直接看這些容器文件。這些日誌文件的目錄能夠查看YARN配置(yarn.nodemanager.remote-app-log-dir and yarn.nodemanager.remote-app-log-dir-suffix)。這些日誌在Spark Web UI的「Executors」的選項卡中也能看到。你須要啓動Spark history server和MapReduce history server而且正確地在yarn-site.xml配置好 yarn.log.server.url選項。這個Spark history server UI的日誌URL將會把重定向到MapReduce的history server,從而顯示日誌信息。
當日志聚合服務關閉時,日誌被保留在每臺機器的YARN_APP_LOGS_DIR目錄下,該目錄一般被用來配置爲/tmp/logs或$HADOOP_HOME/logs/userlogs,這取決於Hadoop的版本和安裝。查看一個容器的日誌信息須要到對應的主機上的這個目錄下查找。子目錄名稱經過應用ID和容器ID來構成。這種日誌在Spark WebUI的Executors選項卡中也能看到而且不要求啓動MapReduce history server,由於不須要讀取HDFS上的數據。
回顧一下每一個容器建立的環境,增長yarn.nodemanager.delete.debug-delay-sec到一個大數值(好比36000),而且在容器上建立的節點上的yarn.nodemanager.local-dirs中獲得應用程序的緩存。這個目錄包括建立的腳本,JARs和用於建立每一個容器的全部環境變量。它對於調試classpath問題是特別有用的。(注意容許這種方式在集羣的設置和全部節點的重啓須要管理員權限,這樣的話它宿主機上不可用。)
對每一個應用的master或executors使用自定義的log4j配置的話, 請看下:
l 上傳$SPARK_CONF_DIR/log4j.properties文件後,它會和其餘的配置同樣本身更新。注意若是多個option指定時,上面介紹的那種option比這種有更高的優先權。
Note that for the first option, both executors and the application master will share the same log4j configuration, which may cause issues when they run on the same node (e.g. trying to write to the same log file).
注意,對於第一種option而言,全部的executors和應用程序master將會使用一樣的log4j配置,當他們運行在同樣的節點上可能會出問題(例如:寫入到一樣的日誌文件中,也就是併發寫,不難理解吧)
若是在Yarn中你須要一個合適的位置來存放日誌文件,經過在你的log4j.properties中配置spark.yarn.app.container.log.dir,那麼yarn能夠更好的聚合它們並展現。例如:
log4j.appender.file_appender.File=${spark.yarn.app.container.log.dir}/spark.log.對於Streaming程序而言,配置RollingFileAppender和yarn的日誌文件目錄將避免大日誌文件形成的磁盤移除,並且,日誌也能夠很好地被YARN使用。
Property Name | Default | Meaning |
---|---|---|
spark.yarn.am.memory |
512m |
Amount of memory to use for the YARN Application Master in client mode, in the same format as JVM memory strings (e.g. 512m , 2g ). In cluster mode, use spark.driver.memory instead. Use lower-case suffixes, e.g. |
spark.driver.cores |
1 |
Number of cores used by the driver in YARN cluster mode. Since the driver is run in the same JVM as the YARN Application Master in cluster mode, this also controls the cores used by the YARN Application Master. In client mode, use spark.yarn.am.cores to control the number of cores used by the YARN Application Master instead. |
spark.yarn.am.cores |
1 |
Number of cores to use for the YARN Application Master in client mode. In cluster mode, use spark.driver.cores instead. |
spark.yarn.am.waitTime |
100s |
In cluster mode, time for the YARN Application Master to wait for the SparkContext to be initialized. In client mode, time for the YARN Application Master to wait for the driver to connect to it. |
spark.yarn.submit.file.replication |
The default HDFS replication (usually 3 ) |
HDFS replication level for the files uploaded into HDFS for the application. These include things like the Spark jar, the app jar, and any distributed cache files/archives. |
spark.yarn.preserve.staging.files |
false |
Set to true to preserve the staged files (Spark jar, app jar, distributed cache files) at the end of the job rather than delete them. |
spark.yarn.scheduler.heartbeat.interval-ms |
3000 |
The interval in ms in which the Spark application master heartbeats into the YARN ResourceManager. The value is capped at half the value of YARN's configuration for the expiry interval, i.e. yarn.am.liveness-monitor.expiry-interval-ms . |
spark.yarn.scheduler.initial-allocation.interval |
200ms |
The initial interval in which the Spark application master eagerly heartbeats to the YARN ResourceManager when there are pending container allocation requests. It should be no larger than spark.yarn.scheduler.heartbeat.interval-ms . The allocation interval will doubled on successive eager heartbeats if pending containers still exist, until spark.yarn.scheduler.heartbeat.interval-ms is reached. |
spark.yarn.max.executor.failures |
numExecutors * 2, with minimum of 3 | The maximum number of executor failures before failing the application. |
spark.yarn.historyServer.address |
(none) | The address of the Spark history server, e.g. host.com:18080 . The address should not contain a scheme (http:// ). Defaults to not being set since the history server is an optional service. This address is given to the YARN ResourceManager when the Spark application finishes to link the application from the ResourceManager UI to the Spark history server UI. For this property, YARN properties can be used as variables, and these are substituted by Spark at runtime. For example, if the Spark history server runs on the same node as the YARN ResourceManager, it can be set to ${hadoopconf-yarn.resourcemanager.hostname}:18080 . |
spark.yarn.dist.archives |
(none) | Comma separated list of archives to be extracted into the working directory of each executor. |
spark.yarn.dist.files |
(none) | Comma-separated list of files to be placed in the working directory of each executor. |
spark.executor.instances |
2 |
The number of executors. Note that this property is incompatible with spark.dynamicAllocation.enabled . If both spark.dynamicAllocation.enabled and spark.executor.instances are specified, dynamic allocation is turned off and the specified number of spark.executor.instances is used. |
spark.yarn.executor.memoryOverhead |
executorMemory * 0.10, with minimum of 384 | The amount of off-heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%). |
spark.yarn.driver.memoryOverhead |
driverMemory * 0.10, with minimum of 384 | The amount of off-heap memory (in megabytes) to be allocated per driver in cluster mode. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%). |
spark.yarn.am.memoryOverhead |
AM memory * 0.10, with minimum of 384 | Same as spark.yarn.driver.memoryOverhead , but for the YARN Application Master in client mode. |
spark.yarn.am.port |
(random) | Port for the YARN Application Master to listen on. In YARN client mode, this is used to communicate between the Spark driver running on a gateway and the YARN Application Master running on YARN. In YARN cluster mode, this is used for the dynamic executor feature, where it handles the kill from the scheduler backend. |
spark.yarn.queue |
default |
The name of the YARN queue to which the application is submitted. |
spark.yarn.jar |
(none) | The location of the Spark jar file, in case overriding the default location is desired. By default, Spark on YARN will use a Spark jar installed locally, but the Spark jar can also be in a world-readable location on HDFS. This allows YARN to cache it on nodes so that it doesn't need to be distributed each time an application runs. To point to a jar on HDFS, for example, set this configuration to hdfs:///some/path . |
spark.yarn.access.namenodes |
(none) | A comma-separated list of secure HDFS namenodes your Spark application is going to access. For example, spark.yarn.access.namenodes=hdfs://nn1.com:8032,hdfs://nn2.com:8032 . The Spark application must have access to the namenodes listed and Kerberos must be properly configured to be able to access them (either in the same realm or in a trusted realm). Spark acquires security tokens for each of the namenodes so that the Spark application can access those remote HDFS clusters. |
spark.yarn.appMasterEnv.[EnvironmentVariableName] |
(none) | Add the environment variable specified by EnvironmentVariableName to the Application Master process launched on YARN. The user can specify multiple of these and to set multiple environment variables. In cluster mode this controls the environment of the Spark driver and in client mode it only controls the environment of the executor launcher. |
spark.yarn.containerLauncherMaxThreads |
25 |
The maximum number of threads to use in the YARN Application Master for launching executor containers. |
spark.yarn.am.extraJavaOptions |
(none) | A string of extra JVM options to pass to the YARN Application Master in client mode. In cluster mode, use spark.driver.extraJavaOptions instead. |
spark.yarn.am.extraLibraryPath |
(none) | Set a special library path to use when launching the YARN Application Master in client mode. |
spark.yarn.maxAppAttempts |
yarn.resourcemanager.am.max-attempts in YARN |
The maximum number of attempts that will be made to submit the application. It should be no larger than the global number of max attempts in the YARN configuration. |
spark.yarn.am.attemptFailuresValidityInterval |
(none) | Defines the validity interval for AM failure tracking. If the AM has been running for at least the defined interval, the AM failure count will be reset. This feature is not enabled if not configured, and only supported in Hadoop 2.6+. |
spark.yarn.submit.waitAppCompletion |
true |
In YARN cluster mode, controls whether the client waits to exit until the application completes. If set to true , the client process will stay alive reporting the application's status. Otherwise, the client process will exit after submission. |
spark.yarn.am.nodeLabelExpression |
(none) | A YARN node label expression that restricts the set of nodes AM will be scheduled on. Only versions of YARN greater than or equal to 2.6 support node label expressions, so when running against earlier versions, this property will be ignored. |
spark.yarn.executor.nodeLabelExpression |
(none) | A YARN node label expression that restricts the set of nodes executors will be scheduled on. Only versions of YARN greater than or equal to 2.6 support node label expressions, so when running against earlier versions, this property will be ignored. |
spark.yarn.tags |
(none) | Comma-separated list of strings to pass through as YARN application tags appearing in YARN ApplicationReports, which can be used for filtering when querying YARN apps. |
spark.yarn.keytab |
(none) | The full path to the file that contains the keytab for the principal specified above. This keytab will be copied to the node running the YARN Application Master via the Secure Distributed Cache, for renewing the login tickets and the delegation tokens periodically. (Works also with the "local" master) |
spark.yarn.principal |
(none) | Principal to be used to login to KDC, while running on secure HDFS. (Works also with the "local" master) |
spark.yarn.config.gatewayPath |
(none) | A path that is valid on the gateway host (the host where a Spark application is started) but may differ for paths for the same resource in other nodes in the cluster. Coupled with spark.yarn.config.replacementPath , this is used to support clusters with heterogeneous configurations, so that Spark can correctly launch remote processes. The replacement path normally will contain a reference to some environment variable exported by YARN (and, thus, visible to Spark containers). For example, if the gateway node has Hadoop libraries installed on |
spark.yarn.config.replacementPath |
(none) | See spark.yarn.config.gatewayPath . |
spark.yarn.security.tokens.${service}.enabled |
|
Controls whether to retrieve delegation tokens for non-HDFS services when security is enabled. By default, delegation tokens for all supported services are retrieved when those services are configured, but it's possible to disable that behavior if it somehow conflicts with the application being run. Currently supported services are: |