spark中執行任務會顯示以下格式的進度:
[Stage 4:=========================> (12 + 11) / 24] # 這是stage4階段: ## 共有24個task(一個partition對應一個task,因此有24個partition) ## 當前正在並行執行的task數量爲11。 ## 這裏沒有executor數,由於一個executor裏能夠同時執行多個task(每一個task至少要佔用一個虛擬核vcore) ## 已經有12個task執行完成
觀察這個進度過程有利於看出是否存在數據傾斜:若其中1個task的完成時間明顯高於其餘task,說明極可能這個task處理的數據量多於其餘task。java
executor和task關係:
一個executor能夠並行執行多個task,實際上一個executor是一個進程,task是executor裏的一個線程。
一個task至少要獨佔executor裏的一個虛擬核心vcore。
一個executor裏的核心數由spark-submit的--executor-cores
參數指定。
一個task要佔用幾個核心,能夠由.config("spark.task.cpus", 1)
配置,默認是1即一個task佔用一個vcore。shell
同時並行執行的task最大數量 = executor數目 * (每一個executor核數 / 每一個task佔用核心數)json
任務執行快結束可能會變成這樣:
[Stage 4:=============================================> (22 + 2) / 24]
由於這時候還有2個task沒有完成,此時有些executor可能已經空閒下來了。spa
DataFrameReader讀取csv和json若是設了以下選項,會形成生成的DataFrame只有一個partition,也就是隻有一個task:線程
.option("multiLine", true) //加入此行會形成生成的DataFrame只有一個partition
由於spark要考慮讀取多行解析文件數據,因此不能進行文件的隨意分割。code
相反,若是是單行模式則能夠以任意行結束符進行分割,就能並行讀取,生成的DataFrame就能有多個分區。
若是要讀取的文本文件在hdfs上,生成DataFrame的分區數等於原始文件的block數。如1345MB文件,block大小128MB,會生成11個partition(1345/128=10.5)。進程
查看Dataset分區數:
ds.rdd.getNumPartitions
改變分區數:
ds.repartition #能任意改變分區數,可是速度慢 ds.coalesce #只能減小分區數,對平衡數據傾斜有效,並且是窄依賴因此速度塊。