Spark開發經常使用參數

##Driver spark.driver.coreshtml

driver端分配的核數,默認爲1,thriftserver是啓動thriftserver服務的機器,資源充足的話能夠儘可能給多。java

spark.driver.memorynode

driver端分配的內存數,默認爲1g,同上。緩存

spark.driver.maxResultSize網絡

driver端接收的最大結果大小,默認1GB,最小1MB,設置0爲無限。 這個參數不建議設置的太大,若是要作數據可視化,更應該控制在20-30MB之內。過大會致使OOM。數據結構

spark.extraListeners併發

默認none,隨着SparkContext被建立而建立,用於監聽單參數、無參數構造函數的建立,並拋出異常。app

##Executor spark.executor.memory 每一個executor分配的內存數,默認1g,會受到yarn CDH的限制,和memoryOverhead相加 不能超過總內存限制。jvm

spark.executor.cores分佈式

每一個executor的核數,默認yarn下1核,standalone下爲全部可用的核。

spark.default.parallelism

默認RDD的分區數、並行數。 像reduceByKey和join等這種須要分佈式shuffle的操做中,最大父RDD的分區數;像parallelize之類沒有父RDD的操做,則取決於運行環境下得cluster manager: 若是爲單機模式,本機核數;集羣模式爲全部executor總核數與2中最大的一個。

spark.executor.heartbeatInterval

executor和driver心跳發送間隔,默認10s,必須遠遠小於spark.network.timeout

spark.files.fetchTimeout

從driver端執行SparkContext.addFile() 抓取添加的文件的超時時間,默認60s

spark.files.useFetchCache

默認true,若是設爲true,拉取文件時會在同一個application中本地持久化,被若干個executors共享。這使得當同一個主機下有多個executors時,執行任務效率提升。

spark.broadcast.blockSize

TorrentBroadcastFactory中的每個block大小,默認4m 過大會減小廣播時的並行度,太小會致使BlockManager 產生 performance hit.

spark.files.overwrite

默認false,是否在執行SparkContext.addFile() 添加文件時,覆蓋已有的內容有差別的文件。

spark.files.maxPartitionBytes

單partition中最多能容納的文件大小,單位Bytes 默認134217728 (128 MB)

spark.files.openCostInBytes

小文件合併閾值,小於該參數就會被合併到一個partition內。 默認4194304 (4 MB) 。這個參數在將多個文件放入一個partition時被用到,寧肯設置的小一些,由於在partition操做中,小文件確定會比大文件快。

spark.storage.memoryMapThreshold

從磁盤上讀文件時,最小單位不能少於該設定值,默認2m,小於或者接近操做系統的每一個page的大小。

##Shuffle spark.reducer.maxSizeInFlight

默認48m。從每一個reduce任務同時拉取的最大map數,每一個reduce都會在完成任務後,須要一個堆外內存的緩衝區來存放結果,若是沒有充裕的內存就儘量把這個調小一點。。相反,堆外內存充裕,調大些就能節省gc時間。

spark.reducer.maxBlocksInFlightPerAddress

限制了每一個主機每次reduce能夠被多少臺遠程主機拉取文件塊,調低這個參數能夠有效減輕node manager的負載。(默認值Int.MaxValue)

spark.reducer.maxReqsInFlight

限制遠程機器拉取本機器文件塊的請求數,隨着集羣增大,須要對此作出限制。不然可能會使本機負載過大而掛掉。。(默認值爲Int.MaxValue)

spark.reducer.maxReqSizeShuffleToMem

shuffle請求的文件塊大小 超過這個參數值,就會被強行落盤,防止一大堆併發請求把內存佔滿。(默認Long.MaxValue)

spark.shuffle.compress

是否壓縮map輸出文件,默認壓縮 true

spark.shuffle.spill.compress

shuffle過程當中溢出的文件是否壓縮,默認true,使用spark.io.compression.codec壓縮。

spark.shuffle.file.buffer

在內存輸出流中 每一個shuffle文件佔用內存大小,適當提升 能夠減小磁盤讀寫 io次數,初始值爲32k

spark.shuffle.memoryFraction

該參數表明了Executor內存中,分配給shuffle read task進行聚合操做的內存比例,默認是20%。 cache少且內存充足時,能夠調大該參數,給shuffle read的聚合操做更多內存,以免因爲內存不足致使聚合過程當中頻繁讀寫磁盤。

spark.shuffle.manager

當ShuffleManager爲SortShuffleManager時,若是shuffle read task的數量小於這個閾值(默認是200),則shuffle write過程當中不會進行排序操做,而是直接按照未經優化的HashShuffleManager的方式去寫數據,可是最後會將每一個task產生的全部臨時磁盤文件都合併成一個文件,並會建立單獨的索引文件。

當使用SortShuffleManager時,若是的確不須要排序操做,那麼建議將這個參數調大一些,大於shuffle read task的數量。那麼此時就會自動啓用bypass機制,map-side就不會進行排序了,減小了排序的性能開銷。可是這種方式下,依然會產生大量的磁盤文件,所以shuffle write性能有待提升。

spark.shuffle.consolidateFiles

若是使用HashShuffleManager,該參數有效。若是設置爲true,那麼就會開啓consolidate機制,會大幅度合併shuffle write的輸出文件,對於shuffle read task數量特別多的狀況下,這種方法能夠極大地減小磁盤IO開銷,提高性能。

若是的確不須要SortShuffleManager的排序機制,那麼除了使用bypass機制,還能夠嘗試將spark.shuffle.manager參數手動指定爲hash,使用HashShuffleManager,同時開啓consolidate機制。

spark.shuffle.io.maxRetries

shuffle read task從shuffle write task所在節點拉取屬於本身的數據時,若是由於網絡異常致使拉取失敗,是會自動進行重試的。該參數就表明了能夠重試的最大次數。若是在指定次數以內拉取仍是沒有成功,就可能會致使做業執行失敗。

對於那些包含了特別耗時的shuffle操做的做業,建議增長重試最大次數(好比60次),以免因爲JVM的full gc或者網絡不穩定等因素致使的數據拉取失敗。在實踐中發現,對於針對超大數據量(數十億~上百億)的shuffle過程,調節該參數能夠大幅度提高穩定性。

spark.shuffle.io.retryWait

同上,默認5s,建議加大間隔時長(好比60s),以增長shuffle操做的穩定性

##Compression and Serialization spark.broadcast.compress

廣播變量前是否會先進行壓縮。默認true (spark.io.compression.codec)

spark.io.compression.codec

壓縮RDD數據、日誌、shuffle輸出等的壓縮格式 默認lz4

spark.io.compression.lz4.blockSize

使用lz4壓縮時,每一個數據塊大小 默認32k

spark.rdd.compress

rdd是否壓縮 默認false,節省memory_cache大量內存 消耗更多的cpu資源(時間)。

spark.serializer.objectStreamReset

當使用JavaSerializer序列化時,會緩存對象防止寫多餘的數據,但這些對象就不會被gc,能夠輸入reset 清空緩存。默認緩存100個對象,修改爲-1則不緩存任何對象。

Memory Management

spark.memory.fraction

執行內存和緩存內存(堆)佔jvm總內存的比例,剩餘的部分是spark留給用戶存儲內部源數據、數據結構、異常大的結果數據。 默認值0.6,調小會致使頻繁gc,調大容易形成oom。

spark.memory.storageFraction

用於存儲的內存在堆中的佔比,默認0.5。調大會致使執行內存太小,執行數據落盤,影響效率;調小會致使緩存內存不夠,緩存到磁盤上去,影響效率。

值得一提的是在spark中,執行內存和緩存內存公用java堆,當執行內存沒有使用時,會動態分配給緩存內存使用,反之也是這樣。若是執行內存不夠用,能夠將存儲內存釋放移動到磁盤上(最多釋放不能超過本參數劃分的比例),但存儲內存不能把執行內存搶走。

spark.memory.offHeap.enabled

是否容許使用堆外內存來進行某些操做。默認false

spark.memory.offHeap.size

容許使用進行操做的堆外內存的大小,單位bytes 默認0

spark.cleaner.periodicGC.interval

控制觸發gc的頻率,默認30min

spark.cleaner.referenceTracking

是否進行context cleaning,默認true

spark.cleaner.referenceTracking.blocking

清理線程是否應該阻止清理任務,默認true

spark.cleaner.referenceTracking.blocking.shuffle

清理線程是否應該阻止shuffle的清理任務,默認false

spark.cleaner.referenceTracking.cleanCheckpoints

清理線程是否應該清理依賴超出範圍的檢查點文件(checkpoint files不知道怎麼翻譯。。)默認false

Networking

spark.rpc.message.maxSize

executors和driver間消息傳輸、map輸出的大小,默認128M。map多能夠考慮增長。

spark.driver.blockManager.port和spark.driver.bindAddress

driver端綁定監聽block manager的地址與端口。

spark.driver.host和spark.driver.port

driver端的ip和端口。

spark.network.timeout

網絡交互超時時間,默認120s。若是 spark.core.connection.ack.wait.timeout spark.storage.blockManagerSlaveTimeoutMs spark.shuffle.io.connectionTimeout spark.rpc.askTimeout orspark.rpc.lookupTimeout 沒有設置,那麼就以此參數爲準。

spark.port.maxRetries

設定了一個端口後,在放棄以前的最大重試次數,默認16。 會有一個預重試機制,每次會嘗試前一次嘗試的端口號+1的端口。如 設定了端口爲8000,則最終會嘗試8000~(8000+16)範圍的端口。

spark.rpc.numRetries

rpc任務在放棄以前的重試次數,默認3,即rpc task最多會執行3次。

spark.rpc.retry.wait

重試間隔,默認3s

spark.rpc.askTimeout

rpc任務超時時間,默認spark.network.timeout

spark.rpc.lookupTimeout

rpc任務查找時長

Scheduling

spark.scheduler.maxRegisteredResourcesWaitingTime

在執行前最大等待申請資源的時間,默認30s。

spark.scheduler.minRegisteredResourcesRatio

實際註冊的資源數佔預期須要的資源數的比例,默認0.8

spark.scheduler.mode

調度模式,默認FIFO 先進隊列先調度,能夠選擇FAIR。

spark.scheduler.revive.interval

work回覆重啓的時間間隔,默認1s

spark.scheduler.listenerbus.eventqueue.capacity

spark事件監聽隊列容量,默認10000,必須爲正值,增長可能會消耗更多內存

spark.blacklist.enabled

是否列入黑名單,默認false。若是設成true,當一個executor失敗好幾回時,會被列入黑名單,防止後續task派發到這個executor。能夠進一步調節spark.blacklist如下相關的參數: (均爲測試參數 Experimental) spark.blacklist.timeout spark.blacklist.task.maxTaskAttemptsPerExecutor spark.blacklist.task.maxTaskAttemptsPerNode spark.blacklist.stage.maxFailedTasksPerExecutor spark.blacklist.application.maxFailedExecutorsPerNode spark.blacklist.killBlacklistedExecutors spark.blacklist.application.fetchFailure.enabled

spark.speculation

推測,若是有task執行的慢了,就會從新執行它。默認false,

詳細相關配置以下: spark.speculation.interval

檢查task快慢的頻率,推測間隔,默認100ms。

spark.speculation.multiplier

推測比均值慢幾回算是task執行過慢,默認1.5

spark.speculation.quantile

在某個stage,完成度必須達到該參數的比例,才能被推測,默認0.75

spark.task.cpus

每一個task分配的cpu數,默認1

spark.task.maxFailures

在放棄這個job前容許的最大失敗次數,重試次數爲該參數-1,默認4

spark.task.reaper.enabled

賦予spark監控有權限去kill那些失效的task,默認false (原先有 job失敗了但一直顯示有task在running,總算找到這個參數了)

其餘進階的配置以下: spark.task.reaper.pollingInterval

輪詢被kill掉的task的時間間隔,若是還在running,就會打warn日誌,默認10s。

spark.task.reaper.threadDump

線程回收是是否產生日誌,默認true。

spark.task.reaper.killTimeout

當一個被kill的task過了多久還在running,就會把那個executor給kill掉,默認-1。

spark.stage.maxConsecutiveAttempts

在終止前,一個stage連續嘗試次數,默認4。

Dynamic Allocation

spark.dynamicAllocation.enabled

是否開啓動態資源配置,根據工做負載來衡量是否應該增長或減小executor,默認false

如下相關參數: spark.dynamicAllocation.minExecutors

動態分配最小executor個數,在啓動時就申請好的,默認0

spark.dynamicAllocation.maxExecutors

動態分配最大executor個數,默認infinity

spark.dynamicAllocation.initialExecutors

動態分配初始executor個數默認值=spark.dynamicAllocation.minExecutors

spark.dynamicAllocation.executorIdleTimeout

當某個executor空閒超過這個設定值,就會被kill,默認60s

spark.dynamicAllocation.cachedExecutorIdleTimeout

當某個緩存數據的executor空閒時間超過這個設定值,就會被kill,默認infinity

spark.dynamicAllocation.schedulerBacklogTimeout

任務隊列非空,資源不夠,申請executor的時間間隔,默認1s

spark.dynamicAllocation.sustainedSchedulerBacklogTimeout

同schedulerBacklogTimeout,是申請了新executor以後繼續申請的間隔,默認=schedulerBacklogTimeout

Spark Streaming

spark.streaming.stopGracefullyOnShutdown (true / false)默認fasle

確保在kill任務時,可以處理完最後一批數據,再關閉程序,不會發生強制kill致使數據處理中斷,沒處理完的數據丟失

spark.streaming.backpressure.enabled (true / false) 默認false

開啓後spark自動根據系統負載選擇最優消費速率

spark.streaming.backpressure.initialRate (整數) 默認直接讀取全部

在開啓反壓的狀況下,限制第一次批處理應該消費的數據,由於程序冷啓動隊列裏面有大量積壓,防止第一次所有讀取,形成系統阻塞

spark.streaming.kafka.maxRatePerPartition (整數) 默認直接讀取全部

限制每秒每一個消費線程讀取每一個kafka分區最大的數據量

spark.streaming.unpersist

自動將spark streaming產生的、持久化的數據給清理掉,默認true,自動清理內存垃圾。

spark.streaming.ui.retainedBatches spark streaming 日誌接口在gc時保留的batch個數,默認1000

原文出處:https://www.cnblogs.com/xiaodf/p/10861833.html

相關文章
相關標籤/搜索