Spark能夠經過三種方式配置系統:html
Spark屬性能夠爲每一個應用分別進行配置,這些屬性能夠直接經過SparkConf設定,也能夠經過set方法設定相關屬性。
下面展現了在本地機使用兩個線程併發執行的配置代碼:java
val conf = new SparkConf() .setMaster("local[2]") .setAppName("CountingSheep") val sc = new SparkContext(conf)
對於部分時間參數須要制定單位,例如node
1b (bytes) 1k or 1kb (kibibytes = 1024 bytes) 1m or 1mb (mebibytes = 1024 kibibytes) 1g or 1gb (gibibytes = 1024 mebibytes) 1t or 1tb (tebibytes = 1024 gibibytes) 1p or 1pb (pebibytes = 1024 tebibytes)
有時爲了不經過編碼設定參數,能夠經過建立空的SparkConf,並在調用腳本時制定相關參數python
./bin/spark-submit --name "My app" --master local[4] --conf spark.eventLog.enabled=false --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar
spark shell和spark-submit提供兩種方式動態加載配置web
spark.master spark://5.6.7.8:7077 spark.executor.memory 4g spark.eventLog.enabled true spark.serializer org.apache.spark.serializer.KryoSerializer
參數設置在執行時會進行合併,默認最高優先級是經過代碼設置,其次是經過命令行參數,最後是默認的配置文件。shell
能夠經過web界面http://:4040中的Environment查看Spark配置信息(僅顯示spark-defaults.conf、SparkConf和命令行參數)。能夠根據web頁面肯定配置屬性是否生效。apache
大部分配置參數都有默認值,如下是經常使用配置緩存
屬性 | 默認值 | 描述 |
---|---|---|
spark.app.name | (none) | 應用程序的名稱,會在日誌和webUI顯示 |
spark.driver.cores | 1 | driver程序佔用的CPU核數,只在cluster模式下有小。 |
spark.driver.maxResultSize | 1g | 對Spark每一個action結果集大小的限制,最少是1M,若設爲0則不限制大小。若Job結果超過限制則會異常退出,若結果集限制過大也可能形成OOM問題。 |
spark.driver.memory | 1g | driver進程可用的內存。注意:不能在代碼中配置,由於此時driver已經啓動,能夠經過–driver-memory命令行參數或者配置文件進行配置。 |
spark.executor.memory | 1g | 每一個executor可用的內存數量 (e.g. 2g, 8g). |
spark.extraListeners | (none) | 一系列實現SparkListener的類,spark監聽總線會建立這些類的實例。 |
spark.local.dir | /tmp | 用於存儲mpp輸出文件和RDD緩存文件,常配置在SSD等存儲設備上,能夠經過逗號分隔指定多個目錄。 注意: 在Spark 1.0 後續版本,會被SPARK_LOCAL_DIRS (Standalone, Mesos) or LOCAL_DIRS (YARN) 環境變量覆蓋. |
spark.logConf | false | 將SparkConf 的有效配置做爲INFO進行記錄 |
spark.master | (none) | 集羣master節點 |
屬性 | 默認值 | 描述 |
---|---|---|
spark.driver.userClassPathFirst | false | 用戶指定的jars優先於Spark的庫。用於解決用戶與環境的版本衝突 |
spark.executor.logs.rolling.maxRetainedFiles | (none) | 系統保留日誌的最大數量,當超限時,舊的日誌被刪除,默認不啓動 |
spark.executor.logs.rolling.time.interval | daily | 設置日誌rolling時間間隔,默認rolling不啓動 |
spark.executor.userClassPathFirst | false | executor執行時,用戶指定的jars優先於Spark的庫。用於解決用戶與環境的版本衝突 |
spark.python.worker.memory | 512m | 每一個worker進程在彙集時的內存上限,若超限則輸出到硬盤 |
屬性 | 默認值 | 描述 |
---|---|---|
spark.reducer.maxSizeInFlight | 48m | 多個reduce任務從map輸出獲取結果的最大尺寸。因爲每一個reducer須要建立緩存保留數據,除非內存很大,通常不要修改此參數 |
spark.shuffle.compress | true | 是否對map的輸出結果進行壓縮,壓縮器爲spark.io.compression.codec |
spark.shuffle.file.buffer | 32k | 每一個shuffle文件輸出流的內存緩存區大小。這些緩衝區減小了系統IO的調用次數 |
spark.shuffle.manager | sort | shuffle數據的實現方法,包括sort和hash兩種。sort內存利用率更改,從1.2版本後sort做爲默認實現方法 |
spark.shuffle.service.enabled | false | 激活外部shuffle服務。服務維護executor寫的文件,於是executor能夠被安全移除。須要設置spark.dynamicAllocation.enabled 爲true,同事指定外部shuffle服務。 |
spark.shuffle.service.port | 7337 | 默認的外部shuffle服務端口 |
spark.shuffle.sort.bypassMergeThreshold | 200 | 用於設置在Reducer的partition數目少於多少的時候,Sort Based Shuffle內部不使用Merge Sort的方式處理數據,而是直接將每一個partition寫入單獨的文件。這個方式和Hash Based的方式是相似的,區別就是在最後這些文件仍是會合併成一個單獨的文件,並經過一個index索引文件來標記不一樣partition的位置信息。從Reducer看來,數據文件和索引文件的格式和內部是否作過Merge Sort是徹底相同的。這個能夠看作SortBased Shuffle在Shuffle量比較小的時候對於Hash Based Shuffle的一種折衷。固然了它和Hash Based Shuffle同樣,也存在同時打開文件過多致使內存佔用增長的問題。所以若是GC比較嚴重或者內存比較緊張,能夠適當的下降這個值。 |
spark.shuffle.spill.compress | true | 若爲true,表明處理的中間結果在spill到本地硬盤時都會進行壓縮,在將中間結果取回進行merge的時候,要進行解壓。在Disk IO成爲瓶頸的場景下,這個被設置爲true可能比較合適;若是本地硬盤是SSD,那麼這個設置爲false可能比較合適。 |
屬性 | 默認值 | 描述 |
---|---|---|
spark.eventLog.compress | false | 是否壓縮事務日誌,當spark.eventLog.enabled爲true時有效 |
spark.eventLog.dir | file:///tmp/spark-events | 記錄event日誌的目錄,也能夠是hdfs目錄 |
spark.eventLog.enabled | false | 是否記錄Spark events,用於在應用執行完後重建Web UI |
spark.eventLog.enabled | true | 是否容許經過web UI kill掉stages和相關的job |
spark.ui.port | 4040 | 應用統計信息的端口 |
spark.ui.retainedJobs | 1000 | spark UI和status APIs在垃圾回收以前記錄的任務數 |
spark.ui.retainedStages | 1000 | spark UI和status APIs在垃圾回收以前記錄的Stage數 |
spark.worker.ui.retainedExecutors | 1000 | spark UI和status APIs在垃圾回收以前記錄的executor數目 |
spark.worker.ui.retainedDrivers | 1000 | 同上 |
spark.worker.ui.retainedExecutions | 1000 | 同上 |
spark.worker.ui.retainedBatches | 1000 | 同上 |
屬性 | 默認值 | 描述 |
---|---|---|
spark.broadcast.compress | true | 廣播變量是否被壓縮 |
spark.closure.serializer | org.apache.spark.serializer.JavaSerializer | 閉包的序列化類,目前只支持java序列化 |
spark.io.compression.codec | snappy | 內部數據RDD的壓縮編碼器,用於RDD、廣播變量和shuffle數據壓縮。支持三種編碼器:lz四、lzf和snappy。 |
spark.io.compression.lz4.blockSize | 32k | 壓縮塊大小 |
spark.io.compression.snappy.blockSize | 32k | 壓縮塊大小 |
spark.kryo.classesToRegister | (none) | 若使用kryo序列化,本參數指定須要註冊的自定義類 |
spark.kryo.referenceTracking | true(false when using Spark SQL Thrift Server) | 序列化時是否使用相同的對象,若對象圖譜中包含同一對象的多個副本,會提升性能。若不存在該狀況,關閉能夠提升性能 |
spark.kryo.registrationRequired | false | 是否須要kry註冊,若爲true,在序列化未註冊的類時kryo會拋出異常;若爲false,對於未註冊的類,kryo會在每一個對象寫入類名,下降了性能。 |
spark.kryo.registrator | (none) | 指定自定義的kryo註冊類 |
spark.kryoserializer.buffer.max | 64m | kryo序列化的緩衝區大小,須要比全部序列化對象大 |
spark.kryoserializer.buffer | 64k | 初始化的序列化緩衝區,能夠根據須要增加到spark.kryoserializer.buffer.max |
spark.rdd.compress | false | 是否序列化RDD分區,能經過耗費大量CPU下降存儲空間 |
spark.serializer | org.apache.spark.serializer.JavaSerializer | 序列化對象的類,建議使用org.apache.spark.serializer.KryoSerializer |
spark.serializer.objectStreamReset | 100 | 當序列化對象時,爲了減小IO會緩存大量數據,然而這會阻止垃圾回收,能夠經過reset將刷新緩衝區。 |
屬性 | 默認值 | 描述 |
---|---|---|
spark.memory.fraction | 0.75 | 用於執行和存儲的內存比例。值越小,計算內存越小,緩衝區數據被排除的可能越大。這個比例剩餘的部分用於存儲spark元數據、用戶數據結構,最好使用默認值。 |
spark.memory.storageFraction | 0.5 | 在存儲和計算內存中,緩存所佔的內存比例,值越大,計算可用內存越少。 |
spark.memory.offHeap.enabled | false | 若爲true,則spark會嘗試使用堆外內存,同時要求spark.memory.offHeap.size必須是正數 |
spark.memory.offHeap.size | 0 | 堆外內存可用的字節數 |
spark.memory.useLegacyMode | false | 是否可使用傳統內存管理。本參數爲true時,如下參數(已廢棄)纔有效:spark.shuffle.memoryFraction spark.storage.memoryFraction spark.storage.unrollFraction |
屬性 | 默認值 | 描述 |
---|---|---|
spark.broadcast.blockSize | 4m | TorrentBroadcastFactory中每一個block的大小。若值太大會下降廣播的並行度,若值過小則可能出現BlockManager瓶頸 |
spark.broadcast.factory | org.apache.spark.broadcast.TorrentBroadcastFactory | 廣播的實現 |
spark.cleaner.ttl | (infinite) | spark記憶元數據的時間。若超時則清理,用於長時間運行例如spark stream應用,須要注意被緩存的RDD超時也會被清理。 |
spark.executor.cores | 在Yarn是1;standalone中是全部可用的core | 每一個executor可用的CPU核心數目,standalone模式下,每一個worker會每一個executor使用一個CPU核心 |
spark.default.parallelism | 對於reduceByKey和join操做,是RDD中最大分區數;對於parallelize操做,分區數與集羣管理相關:本地模式(CPU核心數做爲分區數)、Mesos(8)、其餘(全部執行器的核心數與2求最大值) | 默認的並行數 |
spark.executor.heartbeatInterval | 10s | executor與driver的心跳間隔 |
spark.files.fetchTimeout | 60s | SparkContext.addFile的超時值 |
spark.files.useFetchCache | true | 若爲true,同一應用的執行器間經過局部緩存優化;若爲false則各個executor獲取各自文件 |
spark.files.overwrite | false | 當目標文件存在時是否重寫 |
spark.hadoop.cloneConf | false | 若爲true,則爲每一個task拷貝hadoop的配置對象; |
spark.hadoop.validateOutputSpecs | true | 若設置爲true,saveAsHadoopFile會驗證輸出目錄是否存在。雖然設爲false能夠忽略文件存在的異常,但建議使用Hadoop文件系統的API手動刪除輸出目錄。當經過Spark Streaming的StreamingContext時本參數會被忽略,由於當進行checkpoint恢復時會重寫已經存在的文件。 |
spark.storage.memoryMapThreshold | 2m | 讀取文件塊時Spark內存map最小的大小。當map所佔內存接近或小於操做系統page大小時,內存映射負載很大 |
spark.externalBlockStore.blockManager | org.apache.spark.storage.TachyonBlockManager | 存儲RDDs的外部文件塊管理器。文件系統的URL被設置爲spark.externalBlockStore.url |
spark.externalBlockStore.baseDir | System.getProperty(「java.io.tmpdir」) | 外部塊存儲RDD的目錄。文件系統URL被設置爲spark.externalBlockStore.url |
spark.externalBlockStore.url | tachyon://localhost:19998 for Tachyon | 表明外部塊文件系統的URL |
屬性 | 默認值 | 描述 |
---|---|---|
spark.akka.frameSize | 128 | 最大消息大小(MB)。通常用於限制executor與driver之間的信息大小,若運行幾千個map和reduce任務,能夠適當調大參數 |
spark.akka.heartbeat.interval | 1000s | 能夠調成很大,用於禁止Akka內部的傳輸失敗檢測。越大的時間間隔減小網絡負載,越小的間隔容易進行Akka錯誤檢測。 |
spark.akka.heartbeat.pauses | 6000s | 與spark.akka.heartbeat.interval相似 |
spark.akka.threads | 4 | actor用於傳輸的線程個數。當driver有較多CPU是能夠調大該值 |
spark.akka.timeout | 100s | spark節點間溝通的超時時間 |
spark.blockManager.port | random | block 管理器的監聽端口 |
spark.broadcast.port | random | driver的http廣播監聽端口 |
spark.driver.host | (local hostname) | driver監聽的主機名或者IP地址。用於master和executor的信息傳輸 |
spark.driver.port | random | driver監聽的接口 |
spark.executor.port | random | executor監聽的端口 |
spark.fileserver.port | random | driver 文件服務監聽的接口 |
spark.network.timeout | 120s | 默認全部網絡交互的超時時間 |
spark.port.maxRetries | 16 | 端口重試鏈接最大次數 |
spark.replClassServer.port | random | driver類服務監聽的接口 |
spark.rpc.numRetries | 3 | RPC任務重試的次數 |
spark.rpc.retry.wait | 3s | RPC任務ask操做的延時 |
spark.rpc.askTimeout | 120s | RPC任務等待超時 |
spark.rpc.lookupTimeout | 120s | RPC遠程lookup操做超時時間 |
屬性 | 默認值 | 描述 |
---|---|---|
spark.cores.max | (not set) | spark應用可用最大CPU內核數,若未設置,stanalone集羣使用 spark.deploy.defaultCores做爲參數,Mesos可使用全部CPU。 |
spark.locality.wait | 3s | data-local或less-local任務啓動任務超時時間。若任務時間長且數據再也不本地,則最後調大 |
spark.locality.wait.node | spark.locality.wait | Customize the locality wait for node locality. For example, you can set this to 0 to skip node locality and search immediately for rack locality (if your cluster has rack information). |
spark.locality.wait.process | spark.locality.wait | Customize the locality wait for process locality. This affects tasks that attempt to access cached data in a particular executor process. |
spark.locality.wait.rack | spark.locality.wait | Customize the locality wait for rack locality. |
spark.scheduler.maxRegisteredResourcesWaitingTime | 30s | Maximum amount of time to wait for resources to register before scheduling begins. |
spark.scheduler.mode | FIFO | 做業調度模式。能夠設置爲FAIR公平調度或者FIFO先進先出 |
spark.scheduler.revive.interval | 1s | The interval length for the scheduler to revive the worker resource offers to run tasks. |
spark.speculation | false | 若設置爲true,則會根據執行慢的stage屢次啓動,以最早完成爲準。 |
spark.speculation.interval | 100ms | speculate 的頻率 |
spark.speculation.multiplier | 1.5 | 當task比全部任務執行時間的中值長多少倍時啓動speculate |
spark.speculation.quantile | 0.75 | 啓動speculate前任務完成數據量所佔比例值 |
spark.task.cpus | 1 | 每一個task分配的cpu數量 |
spark.task.maxFailures | 4 | task失敗的最屢次數,比重試次數多1 |
屬性 | 默認值 | 描述 |
---|---|---|
spark.dynamicAllocation.enabled | false | 是否啓動動態資源分配 |
spark.dynamicAllocation.executorIdleTimeout | 60s | 若動態分配設爲true且executor處於idle狀態的時間已超時,則移除executor |
spark.dynamicAllocation.cachedExecutorIdleTimeout | infinity | 若executor緩存數據超時,且動態內存分配爲true,則移除緩存 |
spark.dynamicAllocation.initialExecutors | spark.dynamicAllocation.minExecutors | 若動態分配爲true,執行器的初始數量 |
spark.dynamicAllocation.maxExecutor | infinity | 執行器最大數量 |
spark.dynamicAllocation.minExecutor | 0 | 執行器最少數量 |
spark.dynamicAllocation.schedulerBacklogTimeout | 1s | If dynamic allocation is enabled and there have been pending tasks backlogged for more than this duration, new executors will be requested. |
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout | schedulerBacklogTimeout | Same as spark.dynamicAllocation.schedulerBacklogTimeout, but used only for subsequent executor requests. |
屬性 | 默認值 | 描述 |
---|---|---|
spark.acls.enable | false | Whether Spark acls should are enabled. If enabled, this checks to see if the user has access permissions to view or modify the job. Note this requires the user to be known, so if the user comes across as null no checks are done. Filters can be used with the UI to authenticate and set the user. |
spark.admin.acls | Empty | Comma separated list of users/administrators that have view and modify access to all Spark jobs. This can be used if you run on a shared cluster and have a set of administrators or devs who help debug when things work. Putting a 「*」 in the list means any user can have the priviledge of admin. |
spark.authenticate | false | Whether Spark authenticates its internal connections. See spark.authenticate.secret if not running on YARN. |
spark.authenticate.secret | None | Set the secret key used for Spark to authenticate between components. This needs to be set if not running on YARN and authentication is enabled. |
spark.authenticate.enableSaslEncryption | false | Enable encrypted communication when authentication is enabled. This option is currently only supported by the block transfer service. |
spark.network.sasl.serverAlwaysEncrypt | false | Disable unencrypted connections for services that support SASL authentication. This is currently supported by the external shuffle service. |
spark.core.connection.ack.wait.timeout | 60s | How long for the connection to wait for ack to occur before timing out and giving up. To avoid unwilling timeout caused by long pause like GC, you can set larger value. |
spark.core.connection.auth.wait.timeout | 30s | How long for the connection to wait for authentication to occur before timing out and giving up. |
spark.modify.acls | Empty | Comma separated list of users that have modify access to the Spark job. By default only the user that started the Spark job has access to modify it (kill it for example). Putting a 「*」 in the list means any user can have access to modify it. |
spark.ui.filters | None | Comma separated list of filter class names to apply to the Spark web UI. The filter should be a standard javax servlet Filter. Parameters to each filter can also be specified by setting a java system property of:spark..params=’param1=value1,param2=value2’For example:-Dspark.ui.filters=com.test.filter1 -Dspark.com.test.filter1.params=’param1=foo,param2=testing’ |
spark.ui.view.acls | Empty | Comma separated list of users that have view access to the Spark web ui. By default only the user that started the Spark job has view access. Putting a 「*」 in the list means any user can have view access to this Spark job. |
屬性 | 默認值 | 描述 |
---|---|---|
spark.ssl.enabled | false | 是否在全部協議中支持SSL鏈接 |
spark.ssl.enabledAlgorithms | Empty | 一些列的密碼運算,指定的cipher須要被JVM支持 |
spark.ssl.keyPassword | None | 私鑰密碼 |
spark.ssl.keyStore | None | key存儲文件,能夠是組件啓動的相對路徑也能夠是絕對路徑 |
spark.ssl.keyStorePassword | None | A password to the key-store |
spark.ssl.protocol | None | A protocol name. The protocol must be supported by JVM. The reference list of protocols one can find on this page. |
spark.ssl.trustStore | None | A path to a trust-store file. The path can be absolute or relative to the directory where the component is started in. |
spark.ssl.trustStorePassword | None | A password to the trust-store. |
屬性 | 默認值 | 描述 |
---|---|---|
spark.streaming.backpressure.enabled | false | Enables or disables Spark Streaming’s internal backpressure mechanism (since 1.5). This enables the Spark Streaming to control the receiving rate based on the current batch scheduling delays and processing times so that the system receives only as fast as the system can process. Internally, this dynamically sets the maximum receiving rate of receivers. This rate is upper bounded by the values spark.streaming.receiver.maxRate and spark.streaming.kafka.maxRatePerPartition if they are set (see below). |
spark.streaming.blockInterval | 200ms | Interval at which data received by Spark Streaming receivers is chunked into blocks of data before storing them in Spark. Minimum recommended - 50 ms. See the performance tuning section in the Spark Streaming programing guide for more details. |
spark.streaming.receiver.maxRate | not set | Maximum rate (number of records per second) at which each receiver will receive data. Effectively, each stream will consume at most this number of records per second. Setting this configuration to 0 or a negative number will put no limit on the rate. See the deployment guide in the Spark Streaming programing guide for mode details. |
spark.streaming.receiver.writeAheadLog.enable | false | Enable write ahead logs for receivers. All the input data received through receivers will be saved to write ahead logs that will allow it to be recovered after driver failures. See the deployment guide in the Spark Streaming programing guide for more details. |
spark.streaming.unpersist | true | Force RDDs generated and persisted by Spark Streaming to be automatically unpersisted from Spark’s memory. The raw input data received by Spark Streaming is also automatically cleared. Setting this to false will allow the raw data and persisted RDDs to be accessible outside the streaming application as they will not be cleared automatically. But it comes at the cost of higher memory usage in Spark. |
spark.streaming.stopGracefullyOnShutdown | false | If true, Spark shuts down the StreamingContext gracefully on JVM shutdown rather than immediately. |
spark.streaming.kafka.maxRatePerPartition | not set | Maximum rate (number of records per second) at which data will be read from each Kafka partition when using the new Kafka direct stream API. See the Kafka Integration guide for more details. |
spark.streaming.kafka.maxRetries | 1 | Maximum number of consecutive retries the driver will make in order to find the latest offsets on the leader of each partition (a default value of 1 means that the driver will make a maximum of 2 attempts). Only applies to the new Kafka direct stream API. |
spark.streaming.ui.retainedBatches | 1000 | How many batches the Spark Streaming UI and status APIs remember before garbage collecting. |
spark.streaming.driver.writeAheadLog.closeFileAfterWrite | false | Whether to close the file after writing a write ahead log record on the driver. Set this to ‘true’ when you want to use S3 (or any file system that does not support flushing) for the metadata WAL on the driver. |
spark.streaming.receiver.writeAheadLog.closeFileAfterWrite | false | Whether to close the file after writing a write ahead log record on the receivers. Set this to ‘true’ when you want to use S3 (or any file system that does not support flushing) for the data WAL on the receivers. |
屬性 | 默認值 | 描述 |
---|---|---|
spark.r.numRBackendThreads | 2 | RBackend維護的RPC句柄個數 |
spark.r.command | Rscript | Executable for executing R scripts in cluster modes for both driver and workers. |
spark.r.driver.command | spark.r.command | Executable for executing R scripts in client modes for driver. Ignored in cluster modes |
其餘參數具體參見https://spark.apache.org/docs/latest/configuration.html安全
部分Spark設置能夠經過配置環境變量(在conf/spark-env.sh中設置)實現。在standalone和Mesos模式中,這個文件能夠設定機器特定的信息,例如主機名。因爲spark-env.sh安裝後並不存在,能夠拷貝spark-env.sh.template,並確保它有執行權限。
如下是spark-env.sh的經常使用參數:網絡
環境變量 | 描述 |
---|---|
JAVA_HOME | java安裝目錄 |
PYSPARK_PYTHON | 運行pyspark的python可執行文件,默認是python2.7 |
SPARK_DRIVER_R | SparkR shell的R可執行文件,默認是R |
SPARK_LOCAL_IP | 機器綁定的IP地址 |
SPARK_PUBLIC_DNS | Spark程序向外廣播的主機名 |
除此以外還有一些spark standalone集羣設置的參數,例如每一個機器運行的最大內存、CPU核數等。
Spark使用log4j記錄日誌。能夠經過配置conf/log4j.properties文件配置日誌。
經過指定SPARK_CONF_DIR變量,能夠覆蓋默認的SAPRK_HOME/conf下面的配置,例如spark-defaults.conf, spark-env.sh, log4j.properties等待。
若想經過spark讀寫HDFS,須要將如下兩個配置文件拷貝到spark classpath目錄下
+ hdfs-size.xml :提供HDFS客戶端默認的操做
+ core-site.xml :設置默認的文件系統名稱
雖然不一樣發行版本配置文件不一樣,但通常都在/etc/hadoop/conf目錄下。爲了使得spark能夠找到這些配置文件,在spark-env.sh文件中配置HADOOP_CONF_DIR變量。