sortByKey引起的疑問(job,shuffle,cache)

Just for fun,寫了一個demo,緩存

val rdd = sc.parallelize(Seq((1, "a"), (2, "c"), (3, "b"), (2, "c")))
val sorted = rdd.sortByKey()
sorted.foreach(println)
val c = sorted.count()

1.job

打開Spark UI,如圖:app

clipboard.png

sortByKey,一個transform算子。爲何transform算子會引起一個job呢?
翻看源碼,字體

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
      : RDD[(K, V)] = self.withScope
  {
    val part = new RangePartitioner(numPartitions, self, ascending)
    new ShuffledRDD[K, V, V](self, part)
      .setKeyOrdering(if (ascending) ordering else ordering.reverse)
  }

有一個RangePartitioner,點進去,ui

class RangePartitioner[K : Ordering : ClassTag, V](
    partitions: Int,
    rdd: RDD[_ <: Product2[K, V]],
    private var ascending: Boolean = true)
  extends Partitioner {

  // We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
  require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")

  private var ordering = implicitly[Ordering[K]]

  // An array of upper bounds for the first (partitions - 1) partitions
  private var rangeBounds: Array[K] = {
    if (partitions <= 1) {
      Array.empty
    } else {
      // This is the sample size we need to have roughly balanced output partitions, capped at 1M.
      val sampleSize = math.min(20.0 * partitions, 1e6)
      // Assume the input partitions are roughly balanced and over-sample a little bit.
      val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
      val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)

有一個sketch方法,點進去,spa

def sketch[K : ClassTag](
      rdd: RDD[K],
      sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
    val shift = rdd.id
    // val classTagK = classTag[K] // to avoid serializing the entire partitioner object
    val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
      val seed = byteswap32(idx ^ (shift << 16))
      val (sample, n) = SamplingUtils.reservoirSampleAndCount(
        iter, sampleSizePerPartition, seed)
      Iterator((idx, n, sample))
    }.collect()
    val numItems = sketched.map(_._2).sum
    (numItems, sketched)
  }

有個collect,這個collect就是rdd的action算子。因此觸發了一個job。可是它仍然是一個transform算子。點開佛reach算子觸發的job,如圖,通過了sortByKey3d

clipboard.png
這段RangePartitioner裏的代碼是幹嗎呢?就是根據key劃分各分區的邊界,以決定後續shuffle從新分區的數據去向。code

2.shuffle

點開count觸發的job,orm

clipboard.png
stage3被skip掉了。代碼並無緩存卻能跳過一個stage。
這是由於sortByKey是個寬依賴算子,發生shuffle,shuffle的過程是上游stage把rdd的數據寫出到臨時文件裏,再由下游stage去讀取。sparkContext的生命週期裏,這些臨時文件(中間結果)一直存在,因此在下一個job觸發的時候,根據rdd的依賴會找到這些臨時文件,從而起到了「緩存」的做用。
因而,我在sortByKey後加了cache。UI圖沒變(這裏不貼了,下面有講)。意味着sortByKey彷佛又執行了一次。cache沒用仍是UI顯示方式就這樣?blog

3.cache

爲了驗證這個問題,我把代碼改了生命週期

val rddA = sc.parallelize(Seq((1, "a"), (2, "c"), (3, "b"), (2, "c")))
  .filter(x => x._1 > 1).aggregateByKey(0)((x, y) => {
  println("agg: " + y); 0
}, (x1, x2) => 0).cache() // 緩存agg後的rdd
val c = rddA.count()
println("總數:" + c)
rddA.foreach(println)

看UI,

clipboard.png

如上,彷佛aggregateByKey又執行了一遍!我代碼中在aggregateByKey裏打印了。

clipboard.png

能夠看到,只打印了一次,說明aggregateByKey只執行了一次,可是在UI中只能整個stage爲灰色或藍色。
而且這個stage3不會去讀取shuffle生成的臨時文件,而是直接從cache中讀取ShuffledRDD。有圖爲證,

clipboard.png
Shuffle Read沒有數據。

PS:的字體確實比win的好看!Ayuthaya或Manaco都比Console好看~

相關文章
相關標籤/搜索