spark一個最重要的特性就是對數據集在各個節點的分區進行控制。控制數據分佈能夠減小網絡開銷,極大地提高總體性能。java
只有Pair RDD纔有分區,非Pair RDD分區的值是None。若是RDD只被掃描一次,不必預先分區處理;若是RDD屢次在諸如鏈接這種基於鍵的操做中使用時,分區纔有做用。apache
分區器決定了RDD的分區個數及每條數據最終屬於哪一個分區。編程
spark提供了兩個分區器:HashPartitioner和RangePartitioner,它們都繼承於org.apache.spark.Partitioner類並實現三個方法。數組
HashPartitioner分區執行原理:對於給定的key,計算其hashCode,再除以分區數取餘,最後的值就是這個key所屬的分區ID。實現以下:網絡
class HashPartitioner(partitions: Int) extends Partitioner { require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.") def numPartitions: Int = partitions def getPartition(key: Any): Int = key match { case null => 0 case _ => Utils.nonNegativeMod(key.hashCode, numPartitions) } override def equals(other: Any): Boolean = other match { case h: HashPartitioner => h.numPartitions == numPartitions case _ => false } override def hashCode: Int = numPartitions }
HashPartitioner分區可能致使每一個分區中數據量的不均勻。而RangePartitioner分區則儘可能保證每一個分區中數據量的均勻,將必定範圍內的數映射到某一個分區內。分區與分區之間數據是有序的,但分區內的元素是不能保證順序的。app
RangePartitioner分區執行原理:less
class RangePartitioner[K: Ordering : ClassTag, V]( partitions: Int, rdd: RDD[_ <: Product2[K, V]], private var ascending: Boolean = true) extends Partitioner { // We allow partitions = 0, which happens when sorting an empty RDD under the default settings. require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.") // 獲取RDD中K類型數據的排序器 private var ordering = implicitly[Ordering[K]] // An array of upper bounds for the first (partitions - 1) partitions private var rangeBounds: Array[K] = { if (partitions <= 1) { // 若是給定的分區數小於等於1的狀況下,直接返回一個空的集合,表示數據不進行分區 Array.empty } else { // This is the sample size we need to have roughly balanced output partitions, capped at 1M. // 給定總的數據抽樣大小,最多1M的數據量(10^6),最少20倍的RDD分區數量,也就是每一個RDD分區至少抽取20條數據 val sampleSize = math.min(20.0 * partitions, 1e6) // Assume the input partitions are roughly balanced and over-sample a little bit. // RDD各分區中的數據量可能會出現傾斜的狀況,乘於3的目的就是保證數據量小的分區可以採樣到足夠的數據,而對於數據量大的分區會進行第二次採樣 val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt // 從rdd中抽取數據,返回值:(總rdd數據量, Array[分區id,當前分區的數據量,當前分區抽取的數據]) val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition) if (numItems == 0L) { // 若是總的數據量爲0(RDD爲空),那麼直接返回一個空的數組 Array.empty } else { // If a partition contains much more than the average number of items, we re-sample from it // to ensure that enough items are collected from that partition. // 計算總樣本數量和總記錄數的佔比,佔比最大爲1.0 val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0) // 保存樣本數據的集合buffer val candidates = ArrayBuffer.empty[(K, Float)] // 保存數據分佈不均衡的分區id(數據量超過fraction比率的分區) val imbalancedPartitions = mutable.Set.empty[Int] // 計算抽取出來的樣本數據 sketched.foreach { case (idx, n, sample) => if (fraction * n > sampleSizePerPartition) { // 若是fraction乘以當前分區中的數據量大於以前計算的每一個分區的抽象數據大小,那麼表示當前分區抽取的數據太少了,該分區數據分佈不均衡,須要從新抽取 imbalancedPartitions += idx } else { // 當前分區不屬於數據分佈不均衡的分區,計算佔比權重,並添加到candidates集合中 // The weight is 1 over the sampling probability. val weight = (n.toDouble / sample.size).toFloat for (key <- sample) { candidates += ((key, weight)) } } } // 對於數據分佈不均衡的RDD分區,從新進行數據抽樣 if (imbalancedPartitions.nonEmpty) { // Re-sample imbalanced partitions with the desired sampling probability. // 獲取數據分佈不均衡的RDD分區,並構成RDD val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains) // 隨機種子 val seed = byteswap32(-rdd.id - 1) // 利用rdd的sample抽樣函數API進行數據抽樣 val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect() val weight = (1.0 / fraction).toFloat candidates ++= reSampled.map(x => (x, weight)) } // 將最終的抽樣數據計算出rangeBounds出來 RangePartitioner.determineBounds(candidates, partitions) } } } // 下一個RDD的分區數量是rangeBounds數組中元素數量+ 1個 def numPartitions: Int = rangeBounds.length + 1 // 二分查找器,內部使用java中的Arrays類提供的二分查找方法 private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K] // 根據RDD的key值返回對應的分區id。從0開始 def getPartition(key: Any): Int = { // 強制轉換key類型爲RDD中本來的數據類型 val k = key.asInstanceOf[K] var partition = 0 if (rangeBounds.length <= 128) { // If we have less than 128 partitions naive search // 若是分區數據小於等於128個,那麼直接本地循環尋找當前k所屬的分區下標 while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) { partition += 1 } } else { // Determine which binary search method to use only once. // 若是分區數量大於128個,那麼使用二分查找方法尋找對應k所屬的下標; // 可是若是k在rangeBounds中沒有出現,實質上返回的是一個負數(範圍)或者是一個超過rangeBounds大小的數(最後一個分區,比全部數據都大) partition = binarySearch(rangeBounds, k) // binarySearch either returns the match location or -[insertion point]-1 if (partition < 0) { partition = -partition - 1 } if (partition > rangeBounds.length) { partition = rangeBounds.length } } // 根據數據排序是升序仍是降序進行數據的排列,默認爲升序 if (ascending) { partition } else { rangeBounds.length - partition } }
影響分區的算子操做有:cogroup()、groupWith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、combineByKey()、partitionBy()、repartition()、coalesce()、sort()、mapValues()(若是父RDD有分區方式)、flatMapValues()(若是父RDD有分區方式)。dom
對於執行兩個RDD的算子操做,輸出數據的分區方式取決於父RDD的分區方式。默認狀況下,結果會採用哈希分區,分區的數量和操做的並行度同樣。不過,若是其中一個父RDD設置過度區方式,結果就採用那種分區方式;若是兩個父RDD都設置過度區方式,結果RDD採用第一個父RDD的分區方式。ide
repartition 和 partitionBy 都是對數據進行從新分區,默認都是使用 HashPartitioner。可是兩者之間的區別有:函數
其實partitionBy的結果纔是咱們所預期的。repartition 其實使用了一個隨機生成的數來看成 key,而不是使用原來的key。
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true) } def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null) : RDD[T] = withScope { if (shuffle) { /** Distributes elements evenly across output partitions, starting from a random partition. */ val distributePartition = (index: Int, items: Iterator[T]) => { var position = (new Random(index)).nextInt(numPartitions) items.map { t => // Note that the hash code of the key will just be the key itself. The HashPartitioner // will mod it with the number of total partitions. position = position + 1 (position, t) } } : Iterator[(Int, T)] // include a shuffle step so that our upstream tasks are still distributed new CoalescedRDD( new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition), new HashPartitioner(numPartitions)), numPartitions).values } else { new CoalescedRDD(this, numPartitions) } }
兩個算子都是對RDD的分區進行從新劃分,repartition只是coalesce接口中shuffle爲true的簡易實現,(假設RDD有N個分區,須要從新劃分紅M個分區)
統計用戶訪問其未訂閱主題頁面的狀況。
val sc = new SparkContext() val userData = sc.sequenceFile[UserID,LinkInfo]("hdfs://...").persist def processNewLogs(logFileName:String){ val events = sc.sequenceFile[UserID, LinkInfo](logFileName) //RDD of (UserID,(UserInfo,LinkInfo)) pairs val joined = usersData.join(events) val offTopicVisits = joined.filter { // Expand the tuple into its components case (userId, (userInfo, linkInfo)) => !userInfo.topics.contains(linkInfo.topic) }.count() println("Number of visits to non-subscribed opics: " + offTopicVisits) }
鏈接操做會將兩個數據集中的全部鍵的哈希值都求出來,將哈希值相同的記錄經過網絡傳到同一臺機器上,而後再對全部鍵相同的記錄進行鏈接操做。userData表數據量很大,因此這樣進行哈希計算和跨節點數據混洗很是耗時。
val userData = sc.sequenceFile[UserID,LinkInfo]("hdfs://...") .partionBy(new HashPartiotioner(100)) .persist()
userData表進行了從新分區,將鍵相同的數據都放在一個分區中。而後調用persist持久化結果數據,不用每次都計算哈希和跨節點混洗。程序運行速度顯著提高。
忠於技術,熱愛分享。歡迎關注公衆號:java大數據編程,瞭解更多技術內容。