分區是指如何把RDD分佈在spark集羣的各個節點的操做。以及一個RDD可以分多少個分區。html
一個分區是大型分佈式數據集的邏輯塊。node
那麼思考一下:分區數如何映射到spark的任務數?如何驗證?分區和任務如何對應到本地的數據?git
Spark使用分區來管理數據,這些分區有助於並行化分佈式數據處理,並以最少的網絡流量在executors之間發送數據。程序員
默認狀況下,Spark嘗試從靠近它的節點讀取數據到RDD。因爲Spark一般訪問分佈式分區數據,爲了優化transformation(轉換)操做,它建立分區來保存數據塊。redis
存在在HDFS或Cassandra中的分區數據是一一對應的(因爲相同的緣由進行分區)。shell
默認狀況下,每一個HDFS的分區文件(默認分區文件塊大小是64M)都會建立一個RDD分區。apache
默認狀況下,不須要程序員干預,RDD會自動進行分區。但有時候你須要爲你的應用程序,調整分區的大小,或者使用另外一種分區方案。後端
你能夠經過方法def getPartitions: Array[Partition]
來獲取RDD的分區數量。網絡
在spark-shell中執行如下代碼:
val v = sc.parallelize(1 to 100)
併發
scala> v.getNumPartitions
res2: Int = 20 //RDD的分區數是20?why? 緣由在後面講解。
通常來講分區數和任務數是相等的。以上代碼能夠看到分區是20個,再從spark管理界面上看,有20個任務。
能夠經過參數指定RDD的分區數:
val v = sc.parallelize(1 to 100, 2)
scala> v.getNumPartitions
res2: Int = 2 //RDD的分區數是2
能夠看出,指定了分區數量之後,輸出的是指定的分區數。經過界面上看,只有2個任務。
分區塊越小,分區數量就會越多。分區數據就會分佈在更多的worker節點上。但分區越多意味着處理分區的計算任務越多,太大的分區數量(任務數量)多是致使Spark任務運行效率低下的緣由之一。
因此,太大或過小的分區都有可能致使Spark任務執行效率低下。那麼,應該如何設置RDD的分區?
Spark只能爲RDD的每一個分區運行1個併發任務,直到達到Spark集羣的CPU數量。
因此,若是你有一個擁有50個CPU的Spark集羣,那麼你可讓RDD至少有50個分區(或者是CPU數量的2到3倍)。
一個比較好的分區數的值至少是executors的數量。能夠經過參數設置RDD的默認分區數,也就是咱們所說的並行度:
sc.defaultParallelism
上一節中,當沒有設定分區時,在個人Spark集羣中默認的分區數是20,是由於在Spark默認配置文件:conf/spark-defaults.conf中我設置了變量:
spark.default.parallelism 20
一樣,RDD的action函數產生的輸出文件數量,也是由分區的數量來決定的。
分區數量的上限,取決於executor的可用內存大小。
RDD執行的第一個transformation函數的分區數量,決定了在該RDD上執行的後續一系列處理過程的分區數量。例如從hdfs讀取數據的函數:
sc.textFile(path, partition)
當使用函數 rdd = SparkContext().textFile("hdfs://…/file.txt")
時,你獲得的分區數量可能不多,這將會和HDFS的塊的多少相等。但當你的文件中的行比較大時,獲得的分區可能更少。
你也能夠經過textFile函數的第2個參數指定讀取的分區數量,但該分區數量:
sc.textFile("hdfs://host:port/path", 200)
這樣讀取path的文件後,會生成200個分區。
注意:第2個參數指定的分區數,必須大於等於2。
注意:以上描述只是對非壓縮文件適用,對於壓縮文件不能在textFile中指定分區數,而是要進行repartition:
rdd = sc.textFile('demo.gz')
rdd = rdd.repartition(100)
一些函數,例如:map,flatMap,filter不會保留分區。會把每一個函數應用到每個分區上。
函數的定義定義以下:
/**
* Return a new RDD that has exactly numPartitions partitions.
* Can increase or decrease the level of parallelism in this RDD. Internally, this uses a shuffle to redistribute data.
* If you are decreasing the number of partitions in this RDD, consider using coalesce
, which can avoid performing a shuffle.
*/
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
從代碼上能夠看到,repartition是shuffle和numPartitions分區的合併操做。
若分區策略不符合你的應用場景,你能夠編寫本身的Partitioner。
改函數的代碼以下:
/**
* Return a new RDD that is reduced into numPartitions
partitions.
*
* This results in a narrow dependency, e.g. if you go from 1000 partitions
* to 100 partitions, there will not be a shuffle, instead each of the 100
* new partitions will claim 10 of the current partitions.
*
* However, if you’re doing a drastic coalesce, e.g. to numPartitions = 1,
* this may result in your computation taking place on fewer nodes than
* you like (e.g. one node in the case of numPartitions = 1). To avoid this,
* you can pass shuffle = true. This will add a shuffle step, but means the
* current upstream partitions will be executed in parallel (per whatever
* the current partitioning is).
*
* Note: With shuffle = true, you can actually coalesce to a larger number
* of partitions. This is useful if you have a small number of partitions,
* say 100, potentially with a few partitions being abnormally large. Calling
* coalesce(1000, shuffle = true) will result in 1000 partitions with the
* data distributed using a hash partitioner.
*/
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
… …
}
coalesce轉換用於更改分區數。它能夠根據shuffle標誌觸發RDD shuffle(默認狀況下禁用shuffle,即爲false)
從以上代碼註釋能夠看出:該函數是一個合併分區的操做,通常該函數用來進行narrow轉換。爲了讓該函數並行執行,一般把shuffle的值設置成true。
scala> val rdd = sc.parallelize(0 to 10, 8)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :24
scala> rdd.partitions.size
res0: Int = 8
scala> rdd.coalesce(numPartitions=8, shuffle=false) (1)
res1: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[1] at coalesce at :27
scala> res1.toDebugString
res2: String =
(8) CoalescedRDD[1] at coalesce at :27 []
| ParallelCollectionRDD[0] at parallelize at :24 []
scala> rdd.coalesce(numPartitions=8, shuffle=true)
res3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at coalesce at :27
scala> res3.toDebugString
res4: String =
(8) MapPartitionsRDD[5] at coalesce at :27 []
| CoalescedRDD[4] at coalesce at :27 []
| ShuffledRDD[3] at coalesce at :27 []
+-(8) MapPartitionsRDD[2] at coalesce at :27 []
| ParallelCollectionRDD[0] at parallelize at :24 []
注意:
設置要用於HashPartitioner的分區數。它對應於調度程序後端的默認並行度。
它也和如下幾個數量對應:
本地模式的默認並行度的設置源碼以下:
case LOCAL_N_REGEX(threads) =>
def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
// local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
val threadCount = if (threads == "*") localCpuCount else threads.toInt
啓動spark-shell執行如下命令:
scala> val someRDD = sc.parallelize(1 to 100, 4)
someRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at :27
scala> someRDD.map(x => x).collect
17/06/20 07:37:54 INFO spark.SparkContext: Starting job: collect at console:30
… …
再經過spark管理界面查看任務執行狀況:
在終端的spark-shell下執行如下命令:
scala> someRDD.setName("toy").cache
scala> someRDD.map(x => x).collect
再經過spark UI查看cache的狀況: