經過map操做看RDD的Map過程

RDD中的map,flatMap等操做是怎麼串在一塊兒造成DAG圖的呢?這是個很重要的問題,理解了這一點才能更好的理解Spark的內核實現。本文經過map過程來試圖解釋這一點。ide

先看看RDD的一個子類:MapPartitionsRDD,它會用在map函數場景下。函數

它的定義:this

private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    var prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
    preservesPartitioning: Boolean = false,
    isFromBarrier: Boolean = false,
    isOrderSensitive: Boolean = false)
  extends RDD[U](prev)

prev是父RDD,就是父類RDD的入參,在後面的代碼裏就是firstParent。spa

F表明了map函數的定義,其中第二個Int參數是分區索引號。咱們先無論這個f入參怎麼傳進來的,先看看MapPartitionsRDD須要作哪些事。code

前面說過,對於RDD來講,最重要的函數就是compute,MapPartitionsRDD的compute方法定義:索引

override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))

很明確,就是用當前的solit分區來執行入參的f函數!it

那麼,這個MapPartitionsRDD是怎麼產生的呢?原來是在RDD類中的map函數產生的:spark

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
  }

這幾行代碼什麼意思?這裏仍是須要好好分析一下的。io

對照MapPartitionsRDD的定義,咱們知道:class

(_, _, iter) => iter.map(cleanF)

裏面的_,_表明TaskContext和分區索引,由於在MapPartitionsRDD的compute方法中已經有了split入參和context入參,因此在RDD中就不須要傳這兩個參數了。

iter表明要處理的數據集,在MapPartitionsRDD中的compute方法中定義爲:

firstParent[T].iterator(split, context)

函數就是第一個父類RDD的split分區的數據集。這裏就很清楚了,對這個數據集作cleanF操做(也就是sc.clean以後的map函數,sc.clean是去掉不能序列號的字節碼的意思,保證能夠序列化後分發到其餘節點執行)。

相關文章
相關標籤/搜索