Spark:partition、task、executor關係

spark中執行任務會顯示以下格式的進度:

[Stage 4:=========================>                              (12 + 11) / 24]
# 這是stage4階段:
## 共有24個task(一個partition對應一個task,因此有24個partition)
## 當前正在並行執行的task數量爲11。
## 這裏沒有executor數,由於一個executor裏能夠同時執行多個task(每一個task至少要佔用一個虛擬核vcore)
## 已經有12個task執行完成

觀察這個進度過程有利於看出是否存在數據傾斜:若其中1個task的完成時間明顯高於其餘task,說明極可能這個task處理的數據量多於其餘task。java

executor和task關係:

一個executor能夠並行執行多個task,實際上一個executor是一個進程,task是executor裏的一個線程。
一個task至少要獨佔executor裏的一個虛擬核心vcore。
一個executor裏的核心數由spark-submit的--executor-cores參數指定。
一個task要佔用幾個核心,能夠由.config("spark.task.cpus", 1)配置,默認是1即一個task佔用一個vcore。shell

同時並行執行的task最大數量 = executor數目 * (每一個executor核數 / 每一個task佔用核心數)json

任務執行快結束可能會變成這樣:

[Stage 4:=============================================>          (22 + 2) / 24]

由於這時候還有2個task沒有完成,此時有些executor可能已經空閒下來了。spa


DataFrameReader讀取csv和json若是設了以下選項,會形成生成的DataFrame只有一個partition,也就是隻有一個task:線程

.option("multiLine", true)  
//加入此行會形成生成的DataFrame只有一個partition

由於spark要考慮讀取多行解析文件數據,因此不能進行文件的隨意分割。code

相反,若是是單行模式則能夠以任意行結束符進行分割,就能並行讀取,生成的DataFrame就能有多個分區。
若是要讀取的文本文件在hdfs上,生成DataFrame的分區數等於原始文件的block數。如1345MB文件,block大小128MB,會生成11個partition(1345/128=10.5)。進程

查看Dataset分區數:

ds.rdd.getNumPartitions

改變分區數:

ds.repartition #能任意改變分區數,可是速度慢
ds.coalesce #只能減小分區數,對平衡數據傾斜有效,並且是窄依賴因此速度塊。
相關文章
相關標籤/搜索