Spark2.0-RDD分區原理分析

Spark分區原理分析

介紹

分區是指如何把RDD分佈在spark集羣的各個節點的操做。以及一個RDD可以分多少個分區。html

一個分區是大型分佈式數據集的邏輯塊。node

那麼思考一下:分區數如何映射到spark的任務數?如何驗證?分區和任務如何對應到本地的數據?git

Spark使用分區來管理數據,這些分區有助於並行化分佈式數據處理,並以最少的網絡流量在executors之間發送數據。程序員

默認狀況下,Spark嘗試從靠近它的節點讀取數據到RDD。因爲Spark一般訪問分佈式分區數據,爲了優化transformation(轉換)操做,它建立分區來保存數據塊。redis

存在在HDFS或Cassandra中的分區數據是一一對應的(因爲相同的緣由進行分區)。shell

默認狀況下,每一個HDFS的分區文件(默認分區文件塊大小是64M)都會建立一個RDD分區。apache

默認狀況下,不須要程序員干預,RDD會自動進行分區。但有時候你須要爲你的應用程序,調整分區的大小,或者使用另外一種分區方案。後端

你能夠經過方法def getPartitions: Array[Partition]來獲取RDD的分區數量。網絡

在spark-shell中執行如下代碼: 

val v = sc.parallelize(1 to 100) 
scala> v.getNumPartitions 
res2: Int = 20 //RDD的分區數是20?why? 緣由在後面講解。 
併發

通常來講分區數和任務數是相等的。以上代碼能夠看到分區是20個,再從spark管理界面上看,有20個任務。

能夠經過參數指定RDD的分區數: 

val v = sc.parallelize(1 to 100, 2) 
scala> v.getNumPartitions 
res2: Int = 2 //RDD的分區數是2 

能夠看出,指定了分區數量之後,輸出的是指定的分區數。經過界面上看,只有2個任務。

分區大小對Spark性能的影響

分區塊越小,分區數量就會越多。分區數據就會分佈在更多的worker節點上。但分區越多意味着處理分區的計算任務越多,太大的分區數量(任務數量)多是致使Spark任務運行效率低下的緣由之一。

因此,太大或過小的分區都有可能致使Spark任務執行效率低下。那麼,應該如何設置RDD的分區?

Spark只能爲RDD的每一個分區運行1個併發任務,直到達到Spark集羣的CPU數量。 
因此,若是你有一個擁有50個CPU的Spark集羣,那麼你可讓RDD至少有50個分區(或者是CPU數量的2到3倍)。

一個比較好的分區數的值至少是executors的數量。能夠經過參數設置RDD的默認分區數,也就是咱們所說的並行度: 
sc.defaultParallelism 
上一節中,當沒有設定分區時,在個人Spark集羣中默認的分區數是20,是由於在Spark默認配置文件:conf/spark-defaults.conf中我設置了變量: 
spark.default.parallelism 20

一樣,RDD的action函數產生的輸出文件數量,也是由分區的數量來決定的。

分區數量的上限,取決於executor的可用內存大小。

RDD執行的第一個transformation函數的分區數量,決定了在該RDD上執行的後續一系列處理過程的分區數量。例如從hdfs讀取數據的函數: 
sc.textFile(path, partition)

當使用函數 rdd = SparkContext().textFile("hdfs://…​/file.txt")時,你獲得的分區數量可能不多,這將會和HDFS的塊的多少相等。但當你的文件中的行比較大時,獲得的分區可能更少。

你也能夠經過textFile函數的第2個參數指定讀取的分區數量,但該分區數量: 
sc.textFile("hdfs://host:port/path", 200) 
這樣讀取path的文件後,會生成200個分區。 
注意:第2個參數指定的分區數,必須大於等於2。

注意:以上描述只是對非壓縮文件適用,對於壓縮文件不能在textFile中指定分區數,而是要進行repartition: 

rdd = sc.textFile('demo.gz') 
rdd = rdd.repartition(100) 

一些函數,例如:map,flatMap,filter不會保留分區。會把每一個函數應用到每個分區上。

RDD的Repartition

函數的定義定義以下: 

/** 
* Return a new RDD that has exactly numPartitions partitions. 
* Can increase or decrease the level of parallelism in this RDD. Internally, this uses a shuffle to redistribute data. 
* If you are decreasing the number of partitions in this RDD, consider using coalesce, which can avoid performing a shuffle. 
*/ 
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { 
coalesce(numPartitions, shuffle = true) 


從代碼上能夠看到,repartition是shuffle和numPartitions分區的合併操做。

若分區策略不符合你的應用場景,你能夠編寫本身的Partitioner。

coalesce 轉換

改函數的代碼以下: 

/** 
* Return a new RDD that is reduced into numPartitions partitions. 

* This results in a narrow dependency, e.g. if you go from 1000 partitions 
* to 100 partitions, there will not be a shuffle, instead each of the 100 
* new partitions will claim 10 of the current partitions. 

* However, if you’re doing a drastic coalesce, e.g. to numPartitions = 1, 
* this may result in your computation taking place on fewer nodes than 
* you like (e.g. one node in the case of numPartitions = 1). To avoid this, 
* you can pass shuffle = true. This will add a shuffle step, but means the 
* current upstream partitions will be executed in parallel (per whatever 
* the current partitioning is). 

* Note: With shuffle = true, you can actually coalesce to a larger number 
* of partitions. This is useful if you have a small number of partitions, 
* say 100, potentially with a few partitions being abnormally large. Calling 
* coalesce(1000, shuffle = true) will result in 1000 partitions with the 
* data distributed using a hash partitioner. 
*/ 
def coalesce(numPartitions: Int, shuffle: Boolean = false, 
partitionCoalescer: Option[PartitionCoalescer] = Option.empty) 
(implicit ord: Ordering[T] = null) 
: RDD[T] = withScope { 
… … 

coalesce轉換用於更改分區數。它能夠根據shuffle標誌觸發RDD shuffle(默認狀況下禁用shuffle,即爲false)

從以上代碼註釋能夠看出:該函數是一個合併分區的操做,通常該函數用來進行narrow轉換。爲了讓該函數並行執行,一般把shuffle的值設置成true。

coalesce使用舉例


scala> val rdd = sc.parallelize(0 to 10, 8) 
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :24

scala> rdd.partitions.size 
res0: Int = 8

scala> rdd.coalesce(numPartitions=8, shuffle=false) (1) 
res1: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[1] at coalesce at :27

scala> res1.toDebugString 
res2: String = 
(8) CoalescedRDD[1] at coalesce at :27 [] 
| ParallelCollectionRDD[0] at parallelize at :24 []

scala> rdd.coalesce(numPartitions=8, shuffle=true) 
res3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at coalesce at :27

scala> res3.toDebugString 
res4: String = 
(8) MapPartitionsRDD[5] at coalesce at :27 [] 
| CoalescedRDD[4] at coalesce at :27 [] 
| ShuffledRDD[3] at coalesce at :27 [] 
+-(8) MapPartitionsRDD[2] at coalesce at :27 [] 
| ParallelCollectionRDD[0] at parallelize at :24 [] 

注意:

  • 默認狀況下coalesce是不會進行shuffle。
  • 另外,分區數和源RDD的分區數保持一致。

分區相關參數

spark.default.parallelism

設置要用於HashPartitioner的分區數。它對應於調度程序後端的默認並行度。 
它也和如下幾個數量對應:

  • LocalSchedulerBackend是spark本地執行的調度器,此時,該參數的數量是,本地JVM的線程數。

本地模式的默認並行度的設置源碼以下: 

case LOCAL_N_REGEX(threads) => 
def localCpuCount: Int = Runtime.getRuntime.availableProcessors() 
// local[*] estimates the number of cores on the machine; local[N] uses exactly N threads. 
val threadCount = if (threads == "*") localCpuCount else threads.toInt 

  • Spark on Mesos的CPU數量,默認是8.
  • 總CPU數:totalCoreCount,在CoarseGrainedSchedulerBackend 是2。

如何查看RDD的分區

經過UI查看使用分區的任務執行

啓動spark-shell執行如下命令: 

scala> val someRDD = sc.parallelize(1 to 100, 4) 
someRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at :27

scala> someRDD.map(x => x).collect 
17/06/20 07:37:54 INFO spark.SparkContext: Starting job: collect at console:30 
… … 

再經過spark管理界面查看任務執行狀況: 
這裏寫圖片描述

經過UI查看Partition Caching

在終端的spark-shell下執行如下命令: 

scala> someRDD.setName("toy").cache 
scala> someRDD.map(x => x).collect 

再經過spark UI查看cache的狀況: 
這裏寫圖片描述

經過函數調用獲取分區數量

  • RDD.getNumPartitions
  • rdd.partitions.size

參考文檔: How Many Partitions Does An RDD Have

相關文章
相關標籤/搜索