鍵值對RDD數據分區器apache
做者:尹正傑數組
版權聲明:原創做品,謝絕轉載!不然將追究法律責任。服務器
一.鍵值對RDD數據分區器概述ide
Spark目前支持Hash分區和Range分區,用戶也能夠自定義分區,Hash分區爲當前的默認分區,Spark中分區器直接決定了RDD中分區的個數、RDD中每條數據通過Shuffle過程屬於哪一個分區和Reduce的個數。 舒適提示: 1>.只有Key-Value類型的RDD纔有分區器的,非Key-Value類型的RDD分區器的值是None 2>.每一個RDD的分區ID範圍:0~numPartitions-1,決定這個值是屬於那個分區的。
二.獲取RDD分區方式 函數
package com.yinzhengjie.bigdata.spark.partitioner import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.HashPartitioner import org.apache.spark.rdd.RDD /** * 能夠經過使用RDD的partitioner 屬性來獲取 RDD 的分區方式。它會返回一個 scala.Option 對象,經過get方法獲取其中的值。 */ object GetRDDPartition { def main(args: Array[String]): Unit = { //初始化配置信息及SparkContext val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]") val sc = new SparkContext(sparkConf) val listRDD:RDD[(Int,Int)] = sc.parallelize(List((1,1),(2,2),(3,3))) //查看RDD的分區器 println(listRDD.partitioner) //使用HashPartitioner算子對RDD進行從新分區 val partitionByRDD = listRDD.partitionBy(new HashPartitioner(2)) //查看從新分區後RDD的分區器 println(partitionByRDD.partitioner) } }
三.Hash分區spa
package com.yinzhengjie.bigdata.spark.partitioner import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD /** * HashPartitioner分區的原理: * 對於給定的key,計算其hashCode,併除以分區的個數取餘,若是餘數小於0,則用餘數+分區的個數(不然加0),最後返回的值就是這個key所屬的分區ID。 */ object HashPartition { def main(args: Array[String]): Unit = { //初始化配置信息及SparkContext val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]") val sc = new SparkContext(sparkConf) val listRDD:RDD[(Int,Int)] = sc.makeRDD(List((1,3),(1,2),(2,4),(2,3),(3,6),(3,8),(1,5),(2,6)),12) //查看RDD的分區器(能夠經過使用RDD的partitioner 屬性來獲取 RDD 的分區方式。它會返回一個 scala.Option 對象,) println(listRDD.partitioner) listRDD.mapPartitionsWithIndex((index,iter)=>{ Iterator(index.toString+" : "+iter.mkString("|")) }).collect.foreach(println) val hashpar:RDD[(Int, Int)] = listRDD.partitionBy(new org.apache.spark.HashPartitioner(7)) println(hashpar.count) println(hashpar.partitioner) hashpar.mapPartitions(iter => Iterator(iter.length)).collect().foreach(println) } }
四.Ranger分區scala
HashPartitioner分區弊端:
可能致使每一個分區中數據量的不均勻,極端狀況下會致使某些分區擁有RDD的所有數據。
RangePartitioner做用:
將必定範圍內的數映射到某一個分區內,儘可能保證每一個分區中數據量的均勻,並且分區與分區之間是有序的,一個分區中的元素確定都是比另外一個分區內的元素小或者大,可是分區內的元素是不能保證順序的。簡單的說就是將必定範圍內的數映射到某一個分區內。
實現過程爲: 第一步:先重整個RDD中抽取出樣本數據,將樣本數據排序,計算出每一個分區的最大key值,造成一個Array[KEY]類型的數組變量rangeBounds; 第二步:判斷key在rangeBounds中所處的範圍,給出該key值在下一個RDD中的分區id下標;該分區器要求RDD中的KEY類型必須是能夠排序的。
舒適提示:
若是想要使用Ranger方式進行分區(這種方式和Hbase的預分區優勢相似),那麼對數據應該有兩個要求,即數據是能夠排序和比較。所以Spark基本上不多使用這種方式。
五.自定義分區code
package com.yinzhengjie.bigdata.spark.partitioner import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.Partitioner /** * 要實現自定義的分區器,你須要繼承 org.apache.spark.Partitioner 類並實現下面三個方法。 * numPartitions: Int: * 返回建立出來的分區數。 * getPartition(key: Any): Int: * 返回給定鍵的分區編號(0到numPartitions-1)。 * equals(): * Java 判斷相等性的標準方法。這個方法的實現很是重要,Spark 須要用這個方法來檢查你的分區器對象是否和其餘分區器實例相同,這樣 Spark 才能夠判斷兩個 RDD 的分區方式是否相同。 * */ class CustomerPartitioner(numParts:Int) extends Partitioner{ //覆蓋分區數 override def numPartitions: Int = numParts //覆蓋分區號獲取函數 override def getPartition(key: Any): Int = { val ckey: String = key.toString ckey.substring(ckey.length-1).toInt%numParts } } /** * 使用自定義的 Partitioner 是很容易的:只要把它傳給 partitionBy() 方法便可。 * * Spark 中有許多依賴於數據混洗的方法,好比 join() 和 groupByKey(),它們也能夠接收一個可選的 Partitioner 對象來控制輸出數據的分區方式。 */ object CustomPartition { def main(args: Array[String]): Unit = { //初始化配置信息及SparkContext val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]") val sc = new SparkContext(sparkConf) //建立listRDD,建設你的服務器是32core,咱們就使用32個切片,但因爲數據僅有8條,所以只有32個分區中僅有8個分區有數據喲~ val listRDD:RDD[(Int,Int)] = sc.makeRDD(List((1,3),(1,2),(2,4),(2,3),(3,6),(3,8),(1,5),(2,6)),32) //查看數據分佈狀況 listRDD.mapPartitionsWithIndex((index,items)=>items.map((index,_))).collect.foreach(println) //將RDD使用自定義的分區類進行從新分區 val par = listRDD.partitionBy(new CustomerPartitioner(3)) //查看從新分區後的數據分佈 par.mapPartitionsWithIndex((index,items)=>items.map((index,_))).collect.foreach(println) } }