剖析Spark數據分區之Spark RDD分區

本文來自OPPO互聯網技術團隊,是《剖析Spark數據分區》系列文章的第二篇,將重點分析Spark RDD的數據分區。該系列共分3篇文章,歡迎持續關注。sql

  • 第一篇:主要分析Hadoop中的分片
  • 第二篇:主要分析Spark RDD的分區;
  • 第三篇:主要分析Spark Streaming,TiSpark中的數據分區;

轉載請註名做者,同時歡迎關注OPPO互聯網技術團隊的公衆號:OPPO_tech,一同分享OPPO前沿互聯網技術及活動。apache

Spark

咱們以Spark on Yarn爲例闡述Spark運行原理。segmentfault

任務運行步驟

1.客戶端提交Application到RM,RM判斷集羣資源是否知足需求 ;分佈式

2.RM在集羣中選擇一臺NodeManager啓動Application Master(cluster模式);ide

3.Driver在AM所在的NodeManager節點啓動進程; 函數

4.AM向ResourceManager申請資源,並在每臺NodeManager上啓動相應的executors;oop

5.Driver開始進行任務調度,經過Transaction操做造成了RDD血緣關係圖,即DAG圖,最後經過Action的調用,觸發Job並調度執行;源碼分析

6.DAGScheduler負責Stage級的調度,主要是將DAG切分紅若干個Stages,並將每一個Stage打包成Taskset交給TaskScheduler調度;性能

7.TaskScheduler負責Task級的調度,將DAGScheduler給過來的Taskset按照指定的調度策略分發到Executor上執行;大數據

Spark RDD

RDD 彈性分佈式數據集,RDD包含5個特徵

1.Compute:

RDD在任務計算時是以分區爲單位的,經過Compute計算獲得每個分片的數據,不一樣的RDD子類能夠實現本身的compute方法;

2.getPartitions:

計算獲取全部分區列表,RDD是一個分區的集合,一個RDD有一個或者多個分區,分區的數量決定了Spark任務的並行度;

3.getDependencies:

獲取RDD的依賴,每一個RDD都有依賴關係(源RDD的依賴關係爲空),這些依賴關係成爲lineage;

4.getPreferredLocations:

對其餘RDD的依賴列表,Spark在進行任務調度時,會嘗試將任務分配到數據所在的機器上,從而避免了機器間的數據傳輸,RDD獲取優先位置的方法爲getPreferredLocations,通常只有涉及到從外部存儲結構中讀取數據時纔會有優先位置,好比HadoopRDD, ShuffleRDD;

5.Partitioner:

決定數據分到哪一個Partition,對於非key-value類型的RDD,Partitioner爲None, 對應key-value類型的RDD,Partitioner默認爲HashPartitioner。在進行shuffle操做時,如reduceByKey, sortByKey,Partitioner決定了父RDD shuffle的輸出時對應的分區中的數據是如何進行map的;

前2個函數是全部RDD必須的,後三個可選,全部的RDD均繼承了該接口。

Spark Partition

因爲RDD的數據量很大,所以爲了計算方便,須要將RDD進行切分並存儲在各個節點的分區當中,從而當咱們對RDD進行各類計算操做時,其實是對每一個分區中的數據進行並行的操做。

也就是一份待處理的原始數據會被按照相應的邏輯切分紅多分,每份數據對應RDD的一個Partition,partition的數量決定了task的數量,影響程序的並行度,Partition是伴生的,也就是說每一種RDD都有其對應的Partition實現。

HadoopRDD

Spark常常須要從hdfs讀取文件生成RDD,而後進行計算分析。這種從hdfs讀取文件生成的RDD就是HadoopRDD。

HadoopRDD主要重寫了RDD接口的三個方法:

  1. override def getPartitions: Array[Partition]
  2. override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)]
  3. override def getPreferredLocations(split:Partition): Seq[String]

決定分區數量的邏輯在getPartitions中,實際上調用的是InputFormat.getSplits,

InputFormat是一個接口:org.apache.hadoop.mapred.InputFormat,其中getInputSplit就是 圖 – 7 中所展現的。

從源碼分析可知,在HadoopRDD這種場景下,RDD的分區數在生成RDD以前就已經決定了,是被HADOOP的參數所決定的,咱們能夠經過調整:

spark.hadoop.mapreduce.input.fileinputformat.split.minsize;
spark.hadoop.mapreduce.input.fileinputformat.split.maxsize;

的大小來調整HadoopRDD的分區數量。

Spark SQL中的分區

Spark SQL 最終將SQL 語句通過邏輯算子樹轉換成物理算子樹。

在物理算子樹中,葉子類型的SparkPlan 節點負責從無到有的建立RDD ,每一個非葉子類型的SparkPlan 節點等價於在RDD 上進行一次Transformation ,即經過調用execute()函數轉換成新的RDD ,最終執行collect()操做觸發計算,返回結果給用戶。

重點分析一下葉子節點:

在Spark SQL 中,LeafExecNode 類型的SparkPlan 負責對初始RDD 的建立。

HiveTableScanExec 會根據Hive數據表存儲的HDFS 信息直接生成HadoopRDD;FileSourceScanExec 根據數據表所在的源文件生成FileScanRDD 。

當向Hive metastore中讀寫Parquet表時文件,轉化的方式經過 spark.sql.hive.convertMetastoreParquet 控制。

默認爲true,若是設置爲 true

會使用 :

org.apache.spark.sql.execution.FileSourceScanExec ,

不然會使用 :

org.apache.spark.sql.hive.execution.HiveTableScanExec

目前FileSourceScanExec包括建立分桶表RDD,非分桶表RDD,無論哪一種方式,最終生成的都是一個FileRDD。

下面分析的建立非分桶表的RDD

FileRDD的getPartition方法:

override protected def getPartitions: Array[RDDPartition] = filePartitions.toArray

要獲取maxSplitBytes,那麼決定因素在如下三個參數:

結論:

若是想要使得maxSplitBytes值變大,也就是分區數變小。

可經過將defaultMaxSplitBytes值調大,

也就是spark.sql.files.maxPartitionBytes,

將spark.sql.files.openCostInBytes也調大;

若是若是想要使得maxSplitBytes值變小,也就是分區數變大。

能夠將defaultMaxSplitBytes值調小,

也就是spark.sql.files.maxPartitionBytes,

將spark.sql.files.openCostInBytes也調小。

下面分析FileSourceScanExec的建立分桶表的RDD。

經過源碼分析,分桶表的分區數量跟桶的數量是一對一關係。

HiveTableScanExec

HiveTableScanExec 會根據 Hive數據表存儲的 HDFS 信息直接生成 HadoopRDD。

一般狀況下,HiveTableScanExec經過文件數量,大小進行分區。

例如:

讀入一份 2048M 大小的數據,hdfs 塊大小設置爲 128M

1) 該目錄有1000個小文件

答案:則會生成1000個partition。

2) 若是隻有1個文件,

答案:則會生成 16 個partition

3) 若是有一個大文件1024M,其他999 個文件共 1024M

答案:則會生成 1007個分區。

針對HiveTableScanExec類型的調優可參考HadoopRDD。

RDD transformation

一個RDD經過transformation轉換成另一個RDD,那麼新生成的分區數量是多少呢?

1) filter(), map(), flatMap(), distinct()

partition數量等於parent RDD的數量。

2) rdd.union(other_rdd)

partition數量等於rdd_size + other_rdd_size

3) rdd.intersection(other_rdd)

partition數量等於max(rdd_size, other_rdd_size)

4) rdd.subtract(other_rdd)

partition數量等於rdd_size

5) rdd.cartesian(other_rdd)

partition數量等於rdd_size * other_rdd_size

RDD coalesce 與 repartition

有時候須要從新設置RDD的分區數量,好比RDD的分區中,RDD分區比較多,可是每一個RDD的數量比較小,分區數量增多可增大任務並行度,可是有可能形成每一個分區的數據量偏少,分區數據量太少致使節點間通訊時間在整個任務執行時長佔比被放大,因此須要設置一個比較合理的分區。

有兩種方法能夠重設RDD分區:分別是coalesce()方法和repartition()。

Repartition是coalesce函數中shuffle爲true的特例。

分佈式計算中,每一個節點只計算部分數據,也就是隻處理一個分片,那麼要想求得某個key對應的所有數據,好比reduceByKey、groupByKey,那就須要把相同key的數據拉取到同一個分區,原分區的數據須要被打亂重組,這個按照必定的規則對數據從新分區的過程就是Shuffle(洗牌)。

Shuffle是鏈接Map和Reduce之間的橋樑,描述的是數據從Map端到Reduce端的過程。

當增長並行度的時候,額外的shuffle是有利的。例如,數據中有一些文件是不可分割的,那麼該大文件對應的分區就會有大量的記錄,而不是說將數據分散到儘量多的分區內部來使用全部已經申請cpu。在這種狀況下,使用Reparition從新產生更多的分區數,以知足後面轉換算子所需的並行度,這會提高很大性能。

分析coalesce函數源碼

Shuffle = true

數據進行基於新的分區數量進行hash計算, 隨機選擇輸出分區,將輸入分區的數據

輸出到輸出分區中。

Shuffle = false

分析CoalescedRDD源碼的getPartitions方法

PartitionCoalescer的做用是:

1) 保證CoalescedRDD的每一個分區基本上對應於它Parent RDD分區的個數相同;

2) CoalescedRDD的每一個分區,儘可能跟它的Parent RDD的本地性形同。好比說CoalescedRDD的分區1對應於它的Parent RDD的1到10這10個分區,可是1到7這7個分區在節點1.1.1.1上;那麼 CoalescedRDD的分區1所要執行的節點就是1.1.1.1。這麼作的目的是爲了減小節點間的數據通訊,提高處理能力;

3) CoalescedRDD的分區儘可能分配到不一樣的節點執行;具體實現可參考DefaultPartitionCoalescer類。

下面以一個例子來分析Repartition和Coalesce。

假設源RDD有N個分區,須要從新劃分紅M個分區

一、Repartition實現:

若是N<M

通常狀況下N個分區有數據分佈不均勻的情況,利用HashPartitioner函數將數據從新分區爲M個,這時須要將shuffle設置爲true;

二、Coalesce實現:

若是N>M

1) N和M相差很少,(假如N是1000,M是100)那麼就能夠將N個分區中的若干個分區合併成一個新的分區,最終合併爲M個分區,這時能夠將shuffle設置爲false;

2) 若是M>N時,coalesce是無效的,不進行shuffle過程,父RDD和子RDD之間是窄依賴關係,沒法使文件數(partiton)變多。總之若是shuffle爲false時,若是傳入的參數大於現有的分區數目,RDD的分區數不變,也就是說不通過shuffle,是沒法將RDD的分區數變多的;

3) 若是N>M而且二者相差懸殊,這時要看executor數與要生成的partition關係,若是executor數 <= 要生成partition數,coalesce效率高,反之若是用coalesce會致使(executor數-要生成partiton數)個excutor空跑從而下降效率。

結語

Spark做爲當前用戶使用最普遍的大數據計算引擎之一,在數據分析師中有着普遍的應用,以其處理速度快著稱,經過上述的分析,咱們可以以合理的計算資源,包括CPU, 內存,executor來執行計算任務,使得咱們的集羣更高效,在相同的計算資源場景下能得到更多的任務產出。

相關文章
相關標籤/搜索