鍵值對RDD數據分區器

                         鍵值對RDD數據分區器apache

                                     做者:尹正傑數組

版權聲明:原創做品,謝絕轉載!不然將追究法律責任。服務器

 

 

 

 

一.鍵值對RDD數據分區器概述ide

  Spark目前支持Hash分區和Range分區,用戶也能夠自定義分區,Hash分區爲當前的默認分區,Spark中分區器直接決定了RDD中分區的個數、RDD中每條數據通過Shuffle過程屬於哪一個分區和Reduce的個數。

  舒適提示:
    1>.只有Key-Value類型的RDD纔有分區器的,非Key-Value類型的RDD分區器的值是None
    2>.每一個RDD的分區ID範圍:0~numPartitions-1,決定這個值是屬於那個分區的。

 

二.獲取RDD分區方式 函數

package com.yinzhengjie.bigdata.spark.partitioner

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.HashPartitioner
import org.apache.spark.rdd.RDD

/**
  *   能夠經過使用RDD的partitioner 屬性來獲取 RDD 的分區方式。它會返回一個 scala.Option 對象,經過get方法獲取其中的值。
  */
object GetRDDPartition {
  def main(args: Array[String]): Unit = {
    //初始化配置信息及SparkContext
    val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)

    val listRDD:RDD[(Int,Int)] = sc.parallelize(List((1,1),(2,2),(3,3)))

    //查看RDD的分區器
    println(listRDD.partitioner)

    //使用HashPartitioner算子對RDD進行從新分區
    val partitionByRDD = listRDD.partitionBy(new HashPartitioner(2))

    //查看從新分區後RDD的分區器
    println(partitionByRDD.partitioner)
  }
}

 

三.Hash分區spa

package com.yinzhengjie.bigdata.spark.partitioner

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

/**
  *   HashPartitioner分區的原理:
  *       對於給定的key,計算其hashCode,併除以分區的個數取餘,若是餘數小於0,則用餘數+分區的個數(不然加0),最後返回的值就是這個key所屬的分區ID。
  */
object HashPartition {
  def main(args: Array[String]): Unit = {
    //初始化配置信息及SparkContext
    val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)

    val listRDD:RDD[(Int,Int)] =  sc.makeRDD(List((1,3),(1,2),(2,4),(2,3),(3,6),(3,8),(1,5),(2,6)),12)

    //查看RDD的分區器(能夠經過使用RDD的partitioner 屬性來獲取 RDD 的分區方式。它會返回一個 scala.Option 對象,)
    println(listRDD.partitioner)

    listRDD.mapPartitionsWithIndex((index,iter)=>{ Iterator(index.toString+" : "+iter.mkString("|")) }).collect.foreach(println)

    val hashpar:RDD[(Int, Int)]  = listRDD.partitionBy(new org.apache.spark.HashPartitioner(7))
    println(hashpar.count)
    println(hashpar.partitioner)
    hashpar.mapPartitions(iter => Iterator(iter.length)).collect().foreach(println)
  }
}

 

四.Ranger分區scala

  HashPartitioner分區弊端:
    可能致使每一個分區中數據量的不均勻,極端狀況下會致使某些分區擁有RDD的所有數據。
  RangePartitioner做用:
    將必定範圍內的數映射到某一個分區內,儘可能保證每一個分區中數據量的均勻,並且分區與分區之間是有序的,一個分區中的元素確定都是比另外一個分區內的元素小或者大,可是分區內的元素是不能保證順序的。簡單的說就是將必定範圍內的數映射到某一個分區內。
    實現過程爲:       第一步:先重整個RDD中抽取出樣本數據,將樣本數據排序,計算出每一個分區的最大key值,造成一個Array[KEY]類型的數組變量rangeBounds;       第二步:判斷key在rangeBounds中所處的範圍,給出該key值在下一個RDD中的分區id下標;該分區器要求RDD中的KEY類型必須是能夠排序的。

  舒適提示:
    若是想要使用Ranger方式進行分區
(這種方式和Hbase的預分區優勢相似),那麼對數據應該有兩個要求,即數據是能夠排序和比較。所以Spark基本上不多使用這種方式。

 

五.自定義分區code

package com.yinzhengjie.bigdata.spark.partitioner

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.Partitioner

/**
  *     要實現自定義的分區器,你須要繼承 org.apache.spark.Partitioner 類並實現下面三個方法。
  *       numPartitions: Int:
  *         返回建立出來的分區數。
  *       getPartition(key: Any): Int:
  *         返回給定鍵的分區編號(0到numPartitions-1)。
  *       equals():
  *          Java 判斷相等性的標準方法。這個方法的實現很是重要,Spark 須要用這個方法來檢查你的分區器對象是否和其餘分區器實例相同,這樣 Spark 才能夠判斷兩個 RDD 的分區方式是否相同。
  *
  */
class CustomerPartitioner(numParts:Int) extends Partitioner{

  //覆蓋分區數
  override def numPartitions: Int = numParts

  //覆蓋分區號獲取函數
  override def getPartition(key: Any): Int = {
    val ckey: String = key.toString
    ckey.substring(ckey.length-1).toInt%numParts
  }
}


/**
  *     使用自定義的 Partitioner 是很容易的:只要把它傳給 partitionBy() 方法便可。
  *
  *     Spark 中有許多依賴於數據混洗的方法,好比 join() 和 groupByKey(),它們也能夠接收一個可選的 Partitioner 對象來控制輸出數據的分區方式。
  */
object CustomPartition {
  def main(args: Array[String]): Unit = {
    //初始化配置信息及SparkContext
    val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)

    //建立listRDD,建設你的服務器是32core,咱們就使用32個切片,但因爲數據僅有8條,所以只有32個分區中僅有8個分區有數據喲~
    val listRDD:RDD[(Int,Int)] =  sc.makeRDD(List((1,3),(1,2),(2,4),(2,3),(3,6),(3,8),(1,5),(2,6)),32)

    //查看數據分佈狀況
    listRDD.mapPartitionsWithIndex((index,items)=>items.map((index,_))).collect.foreach(println)

    //將RDD使用自定義的分區類進行從新分區
    val par = listRDD.partitionBy(new CustomerPartitioner(3))

    //查看從新分區後的數據分佈
    par.mapPartitionsWithIndex((index,items)=>items.map((index,_))).collect.foreach(println)
  }
}
相關文章
相關標籤/搜索