Master默認使用512M內存,當集羣中運行的任務特別多時,就會掛掉,緣由是master會讀取每一個task的event log日誌去生成Sparkui,內存不足天然會OOM,能夠在master的運行日誌中看到,經過HA啓動的master天然也會由於這個緣由失敗。java
增長Master的內存佔用,在Master節點spark-env.sh
中設置:node
export SPARK_DAEMON_MEMORY 10g # 根據你的實際狀況
減小保存在Master內存中的做業信息python
spark.ui.retainedJobs 500 # 默認都是1000 spark.ui.retainedStages 500
有時候咱們還會在web ui中看到worker節點消失或處於dead狀態,在該節點運行的任務則會報各類 lost worker
的錯誤,引起緣由和上述大致相同,worker內存中保存了大量的ui信息致使gc時失去和master之間的心跳。mysql
增長Master的內存佔用,在Worker節點spark-env.sh
中設置:git
export SPARK_DAEMON_MEMORY 2g # 根據你的實際狀況
減小保存在Worker內存中的Driver,Executor信息github
spark.worker.ui.retainedExecutors 200 # 默認都是1000 spark.worker.ui.retainedDrivers 200
Spark Shuffle FetchFailedException解決方案web
missing output locationsql
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
shuffle fetch faild數據庫
org.apache.spark.shuffle.FetchFailedException: Failed to connect to spark047215/192.168.47.215:50268
當前的配置爲每一個executor使用1core,5GRAM,啓動了20個executorapache
這種問題通常發生在有大量shuffle操做的時候,task不斷的failed,而後又重執行,一直循環下去,直到application失敗。
通常遇到這種問題提升executor內存便可,同時增長每一個executor的cpu,這樣不會減小task並行度。
啓動的execuote數量爲:7個
execuoterNum = spark.cores.max/spark.executor.cores
每一個executor的配置:
3core,15G RAM
消耗的內存資源爲:105G RAM
15G*7=105G
能夠發現使用的資源並無提高,可是一樣的任務原來的配置跑幾個小時還在卡着,改了配置後幾分鐘就能完成。
executor lost
WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, aa.local): ExecutorLostFailure (executor lost)
task lost
WARN TaskSetManager: Lost task 69.2 in stage 7.0 (TID 1145, 192.168.47.217): java.io.IOException: Connection from /192.168.47.217:55483 closed
各類timeout
java.util.concurrent.TimeoutException: Futures timed out after [120 second] ERROR TransportChannelHandler: Connection to /192.168.47.212:35409 has been quiet for 120000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network. timeout if this is wrong
由網絡或者gc引發,worker或executor沒有接收到executor或task的心跳反饋。
提升 spark.network.timeout
的值,根據狀況改爲300(5min)或更高。
默認爲 120(120s),配置全部網絡傳輸的延時,若是沒有主動設置如下參數,默認覆蓋其屬性
數據傾斜
任務傾斜
差距不大的幾個task,有的運行速度特別慢。
大多數任務都完成了,還有那麼一兩個任務怎麼都跑不完或者跑的很慢,分爲數據傾斜和task傾斜兩種。
數據傾斜
數據傾斜大多數狀況是因爲大量的無效數據引發,好比null或者」「,也有多是一些異常數據,好比統計用戶登陸狀況時,出現某用戶登陸過千萬次的狀況,無效數據在計算前須要過濾掉。
數據處理有一個原則,多使用filter,這樣你真正須要分析的數據量就越少,處理速度就越快。
sqlContext.sql("...where col is not null and col != ''")
具體可參考:
解決spark中遇到的數據傾斜問題
任務傾斜
task傾斜緣由比較多,網絡io,cpu,mem都有可能形成這個節點上的任務執行緩慢,能夠去看該節點的性能監控來分析緣由。之前遇到過同事在spark的一臺worker上跑R的任務致使該節點spark task運行緩慢。
或者能夠開啓spark的推測機制,開啓推測機制後若是某一臺機器的幾個task特別慢,推測機制會將任務分配到其餘機器執行,最後Spark會選取最快的做爲最終結果。
堆內存溢出
java.lang.OutOfMemoryError: Java heap space
內存不夠,數據太多就會拋出OOM的Exeception,主要有driver OOM和executor OOM兩種
driver OOM
通常是使用了collect操做將全部executor的數據聚合到driver致使。儘可能不要使用collect操做便可。
executor OOM
能夠按下面的內存優化的方法增長code使用內存空間
spark.executor.memory
的值org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: ...
若是你在worker中調用了driver中定義的一些變量,Spark就會將這些變量傳遞給Worker,這些變量並無被序列化,因此就會看到如上提示的錯誤了。
val x = new X() //在driver中定義的變量 dd.map{r => x.doSomething(r) }.collect //map中的代碼在worker(executor)中執行
除了上文的map,還有filter,foreach,foreachPartition等操做,還有一個典型例子就是在foreachPartition中使用數據庫建立鏈接方法。這些變量沒有序列化致使的任務報錯。
下面提供三種解決方法:
sparkConf
,SparkContext
,都用 @transent
進行註解,表示這些變量不須要被序列化Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 374 tasks (1026.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
spark.driver.maxResultSize默認大小爲1G 每一個Spark action(如collect)全部分區的序列化結果的總大小限制,簡而言之就是executor給driver返回的結果過大,報這個錯說明須要提升這個值或者避免使用相似的方法,好比countByValue,countByKey等。
將值調大便可
spark.driver.maxResultSize 2g
WARN TaskSetManager: Stage 198 contains a task of very large size (5953 KB). The maximum recommended task size is 100 KB.
這個WARN可能還會致使ERROR
Caused by: java.lang.RuntimeException: Failed to commit task Caused by: org.apache.spark.executor.CommitDeniedException: attempt_201603251514_0218_m_000245_0: Not committed because the driver did not authorize commit
若是你比較瞭解spark中的stage是如何劃分的,這個問題就比較簡單了。
一個Stage中包含的task過大,通常因爲你的transform過程太長,所以driver給executor分發的task就會變的很大。
因此解決這個問題咱們能夠經過拆分stage解決。也就是在執行過程當中調用cache.count
緩存一些中間數據從而切斷過長的stage。
driver did not authorize commit
driver節點內存不足
driver內存不足致使沒法啓動application,將driver分配到內存足夠的機器上或減小driver-memory
Java HotSpot(TM) 64-Bit Server VM warning: INFO:
os::commit_memory(0x0000000680000000, 4294967296, 0) failed;
error=’Cannot allocate memory’ (errno=12)
hdfs空間不夠
hdfs空間不足,event_log沒法寫入,因此 ListenerBus會報錯
,增長hdfs空間(刪除無用數據或增長節點)
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /tmp/spark-history/app-20151228095652-0072.inprogress could only be replicated to 0 nodes instead of minReplication (=1) ERROR LiveListenerBus: Listener EventLoggingListener threw an exception java.lang.reflect.InvocationTargetException
spark編譯包與Hadoop版本不一致
下載對應hadoop版本的spark包或本身編譯。
java.io.InvalidClassException: org.apache.spark.rdd.RDD; local class incompatible: stream classdesc serialVersionUID
driver機器端口使用過多
在一臺機器上沒有指定端口的狀況下,提交了超過15個任務。
16/03/16 16:03:17 ERROR SparkUI: Failed to bind SparkUI java.net.BindException: 地址已在使用: Service 'SparkUI' failed after 16 retries!
提交任務時指定app web ui端口號解決:
--conf spark.ui.port=xxxx
中文亂碼
使用write.csv等方法寫出到hdfs的文件,中文亂碼。JVM使用的字符集若是沒有指定,默認會使用系統的字符集,由於各個節點系統字符集並不都是UTF8致使,因此會出現這個問題。直接給JVM指定字符集便可。
spark-defaults.conf
spark.executor.extraJavaOptions -Dfile.encoding=UTF-8
java.io.UIException: Cannot run program "python2.7": error=2,沒有那個文件或目錄
spark使用的Python版本爲2.7,centOS默認python版本爲2.6,升級便可。
部分節點上有錯誤提示
java.io.IOExeception: Cannot run program "python2.7": error=13, 權限不夠
新加的節點運維裝2.7版本的python,python命令是正確的,python2.7卻沒法調用,只要改改環境變量就行了。
TypeError: ('__cinit__() takes exactly 8 positional arguments (11 given)', <type 'sklearn.tree._tree.Tree'>, (10, array([1], dtype=int32), 1, <sklearn.tree._tree.RegressionCriterion object at 0x100077480>, 50.0, 2, 1, 0.1, 10, 1, <mtrand.RandomState object at 0x10a55da08>))
該pickle文件是在0.17版本的scikit-learn下訓練出來的,有些機器裝的是0.14版本,版本不一致致使,升級可解決,記得將老版本數據清理乾淨,不然會報各類Cannot import xxx
的錯誤。
UnicodeEncodeError: 'ascii' codec can't encode characters in position 0-1: ordinal not in range(128)
方法1:
import sys
reload(sys) sys.setdefaultencoding('utf-8')
方法2:
//報錯
str(u'中國') //不報錯 str(u'中國'.encode('utf-8'))
有時候會發現部分executor並無在執行任務,爲何呢?
(1) 任務partition數過少,
要知道每一個partition只會在一個task上執行任務。改變分區數,能夠經過 repartition
方法,即便這樣,在 repartition
前仍是要從數據源讀取數據,此時(讀入數據時)的併發度根據不一樣的數據源受到不一樣限制,經常使用的大概有如下幾種:
hdfs - block數就是partition數 mysql - 按讀入時的分區規則分partition es - 分區數即爲 es 的 分片數(shard)
(2) 數據本地性的反作用
taskSetManager在分發任務以前會先計算數據本地性,優先級依次是:
process(同一個executor) -> node_local(同一個節點) -> rack_local(同一個機架) -> any(任何節點)
Spark會優先執行高優先級的任務,任務完成的速度很快(小於設置的spark.locality.wait時間),則數據本地性下一級別的任務則一直不會啓動,這就是Spark的延時調度機制。
舉個極端例子:運行一個count任務,若是數據全都堆積在某一臺節點上,那將只會有這臺機器在長期計算任務,集羣中的其餘機器則會處於等待狀態(等待本地性降級)而不執行任務,形成了大量的資源浪費。
判斷的公式爲:
curTime – lastLaunchTime >= localityWaits(currentLocalityIndex)
其中 curTime
爲系統當前時間,lastLaunchTime
爲在某優先級下最後一次啓動task的時間
若是知足這個條件則會進入下一個優先級的時間判斷,直到 any
,不知足則分配當前優先級的任務。
數據本地性任務分配的源碼在 taskSetManager.Scala
。
若是存在大量executor處於等待狀態,能夠下降如下參數的值(也能夠設置爲0),默認都是3s。
spark.locality.wait spark.locality.wait.process spark.locality.wait.node spark.locality.wait.rack
當你數據本地性不好,可適當提升上述值,固然也能夠直接在集羣中對數據進行balance。
有可能哪臺worker節點出現了故障,task執行失敗後會在該 executor
上不斷重試,達到最大重試次數後會致使整個 application
執行失敗,咱們能夠設置失敗黑名單(task在該節點運行失敗後會換節點重試),能夠看到在源碼中默認設置的是 0
,
private val EXECUTOR_TASK_BLACKLIST_TIMEOUT = conf.getLong("spark.scheduler.executorTaskBlacklistTime", 0L)
在 spark-default.sh
中設置
spark.scheduler.executorTaskBlacklistTime 30000
當 task
在該 executor
運行失敗後會在其它 executor
中啓動,同時此 executor
會進入黑名單30s(不會分發任務到該executor)。
若是你的任務shuffle量特別大,同時rdd緩存比較少能夠更改下面的參數進一步提升任務運行速度。
spark.storage.memoryFraction
- 分配給rdd緩存的比例,默認爲0.6(60%),若是緩存的數據較少能夠下降該值。 spark.shuffle.memoryFraction
- 分配給shuffle數據的內存比例,默認爲0.2(20%)
剩下的20%內存空間則是分配給代碼生成對象等。
若是任務運行緩慢,jvm進行頻繁gc或者內存空間不足,或者能夠下降上述的兩個值。 "spark.rdd.compress","true"
- 默認爲false,壓縮序列化的RDD分區,消耗一些cpu減小空間的使用
spark.default.parallelism
發生shuffle時的並行度,在standalone模式下的數量默認爲core的個數,也可手動調整,數量設置太大會形成不少小任務,增長啓動任務的開銷,過小,運行大數據量的任務時速度緩慢。
spark.sql.shuffle.partitions
sql聚合操做(發生shuffle)時的並行度,默認爲200,若是該值過小會致使OOM,executor丟失,任務執行時間過長的問題
相同的兩個任務:
spark.sql.shuffle.partitions=300:
spark.sql.shuffle.partitions=500:
速度變快主要是大量的減小了gc的時間。
可是設置過大會形成性能惡化,過多的碎片task會形成大量無謂的啓動關閉task開銷,還有可能致使某些task hang住沒法執行。
修改map階段並行度主要是在代碼中使用rdd.repartition(partitionNum)
來操做。
spark-sql join優化
map-side-join 關聯優化
Spark不一樣Cluster Manager下的數據本地性表現
spark讀取hdfs數據本地性異常