Just for fun,寫了一個demo,緩存
val rdd = sc.parallelize(Seq((1, "a"), (2, "c"), (3, "b"), (2, "c"))) val sorted = rdd.sortByKey() sorted.foreach(println) val c = sorted.count()
打開Spark UI,如圖:app
sortByKey
,一個transform算子。爲何transform算子會引起一個job呢?
翻看源碼,字體
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) : RDD[(K, V)] = self.withScope { val part = new RangePartitioner(numPartitions, self, ascending) new ShuffledRDD[K, V, V](self, part) .setKeyOrdering(if (ascending) ordering else ordering.reverse) }
有一個RangePartitioner
,點進去,ui
class RangePartitioner[K : Ordering : ClassTag, V]( partitions: Int, rdd: RDD[_ <: Product2[K, V]], private var ascending: Boolean = true) extends Partitioner { // We allow partitions = 0, which happens when sorting an empty RDD under the default settings. require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.") private var ordering = implicitly[Ordering[K]] // An array of upper bounds for the first (partitions - 1) partitions private var rangeBounds: Array[K] = { if (partitions <= 1) { Array.empty } else { // This is the sample size we need to have roughly balanced output partitions, capped at 1M. val sampleSize = math.min(20.0 * partitions, 1e6) // Assume the input partitions are roughly balanced and over-sample a little bit. val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
有一個sketch
方法,點進去,spa
def sketch[K : ClassTag]( rdd: RDD[K], sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = { val shift = rdd.id // val classTagK = classTag[K] // to avoid serializing the entire partitioner object val sketched = rdd.mapPartitionsWithIndex { (idx, iter) => val seed = byteswap32(idx ^ (shift << 16)) val (sample, n) = SamplingUtils.reservoirSampleAndCount( iter, sampleSizePerPartition, seed) Iterator((idx, n, sample)) }.collect() val numItems = sketched.map(_._2).sum (numItems, sketched) }
有個collect
,這個collect
就是rdd的action算子。因此觸發了一個job。可是它仍然是一個transform算子。點開佛reach算子觸發的job,如圖,通過了sortByKey3d
這段RangePartitioner裏的代碼是幹嗎呢?就是根據key劃分各分區的邊界,以決定後續shuffle從新分區的數據去向。code
點開count觸發的job,orm
stage3被skip掉了。代碼並無緩存卻能跳過一個stage。
這是由於sortByKey是個寬依賴算子,發生shuffle,shuffle的過程是上游stage把rdd的數據寫出到臨時文件裏,再由下游stage去讀取。sparkContext的生命週期裏,這些臨時文件(中間結果)一直存在,因此在下一個job觸發的時候,根據rdd的依賴會找到這些臨時文件,從而起到了「緩存」的做用。
因而,我在sortByKey
後加了cache
。UI圖沒變(這裏不貼了,下面有講)。意味着sortByKey彷佛又執行了一次。cache沒用仍是UI顯示方式就這樣?blog
爲了驗證這個問題,我把代碼改了生命週期
val rddA = sc.parallelize(Seq((1, "a"), (2, "c"), (3, "b"), (2, "c"))) .filter(x => x._1 > 1).aggregateByKey(0)((x, y) => { println("agg: " + y); 0 }, (x1, x2) => 0).cache() // 緩存agg後的rdd val c = rddA.count() println("總數:" + c) rddA.foreach(println)
看UI,
如上,彷佛aggregateByKey
又執行了一遍!我代碼中在aggregateByKey
裏打印了。
能夠看到,只打印了一次,說明aggregateByKey
只執行了一次,可是在UI中只能整個stage爲灰色或藍色。
而且這個stage3不會去讀取shuffle生成的臨時文件,而是直接從cache中讀取ShuffledRDD。有圖爲證,
Shuffle Read沒有數據。
PS:的字體確實比win的好看!Ayuthaya或Manaco都比Console好看~