Spark中的RDD究竟是怎麼玩的?

Spark是開源的分佈式計算引擎,基於RDD來構造數據處理流程,並在集羣間調度任務,經過分區數據管理機制來劃分任務的並行度,並在任務之間交換分區數據,實現分佈式的數據處理。java

RDD是Spark中最重要的概念,理解了RDD是什麼,基本也就理解了一半Spark的內部機密了。node

 

一、RDD基類算法

RDD是Spark中表示數據集的基類,是可序列號的對象,所以RDD可在Spark節點中複製。RDD定義了數據迭代器來循環讀取數據,以及在數據集上定義各種轉換操做,生成新的RDD。app

RDD的各類算子會觸發生成新的RDD。如:分佈式

map操做生成MapPartitionsRDD。ide

filter操做也生成MapPartitionsRDD,filter操做實際上是在以前的RDD迭代器上封裝了一層filter操做,其實仍是第一個迭代器,只不過這個迭代器會拋棄掉一些不知足的記錄。函數

RDD的計算過程是經過compute方法來觸發的。oop

1.1 RDD觸發任務this

submit過程是提交spark程序到集羣,這時候會觸發application事件和driver事件等,並經過master節點選擇對應的node來建立app和driver,同時在node上執行spark jar包裏的main方法。但task的真正執行要等到RDD的compute動做來觸發的。編碼

RDD經過compute觸發任務,提交FinalStage給Dag執行。如collect(),count()等方法都會觸發compute過程,間接提交任務。

RDD.compute()=> finalStage => dag.submitJob()=> submitMissingStage() .

dag.submitJob()=> scheduleImpl.launchTask()=>scheduleBackend => executorBackend=> executor.launchTask()=> executorBackend.taskComplete msg => scheduleBackend.taskCompleted=>dag.stageFinished()=> ...

上面是RDD提交任務的大體流程。Compute函數是觸發函數,這會致使最後一個RDD被執行,也是finalStage;finalStage調用DAG的submitJob函數提交stage,這裏的stage就是finalStage。

Stage是從源頭到finalStage串起來的,執行的時候是反向尋找的,這句話要好好體會,這個過程其實就是RDD的祕密了。

咱們先看下RDD的經典圖例。圖中中間的部分Transformation是RDD的計算過程,左邊的HDFS示意數據源,右邊的HDFS示意RDD的finalStage執行的操做(圖中的操做是寫入hdfs,固然也能夠是print操做等等,就看你怎麼寫了)。

Stage1和stage2是窄依賴,map和union都是窄依賴;stage3是寬依賴,這裏是join操做。窄依賴的意思就是操做只依賴一個stage的數據,寬依賴的意思是依賴於多個stage,對這多個stage的數據要作全鏈接操做。

1.二、RDD執行示例

RDD經過runJob調用來得到執行,以下:

def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }

Sc是SparkContext。

對每一個分區執行func操做,返回結果是一個長度等於分區數的Array。

Sc.runJob再調dagScheduler.runJob方法。具體能夠看DagScheduler的做業執行步驟,這裏先不說,看筆者的專門論述DagScheduler的文章。

1.三、迭代器

RDD實際執行是經過迭代器讀取數據的。

RDD是抽象類,定義了幾個接口:

分別是getPartitions、compute、getPreferredLocations。RDD數據是分區存儲,每個分區可能分佈在申請spark資源的任何位置。這三個接口能夠描述RDD的所有信息,其中getPreferredLocations這個方法是和計算本地化有關的,這裏咱們就先忽略它,不影響咱們理解RDD的原理。

override protected def getPartitions: Array[Partition] = {}
override def compute(split: Partition, context: TaskContext): Iterator[java.lang.Integer] = new NextIterator[Integer] {}

getPartitions方法咱們也不用太關注,它的做用是返回一個分區列表,表示這個RDD有幾個分區,實際運行的時候RDD的每一個分區會被安排到單獨的節點上運行,這樣來實現分佈式計算的。

咱們最關心的是compute的方法,這個方法返回一個迭代器,這個迭代器就是這個RDD的split這個分區的數據集。至於這個迭代器的數據是什麼,是在compute方法體中寫代碼來生成的。咱們能夠定義本身的RDD,只要寫代碼實現這幾個方法就能夠了!

自定義RDD有什麼好處呢?最大的好處就是能夠把本身的數據集歸入到Spark的分佈式計算體系中,幫助你實現數據分區,任務分配,和其餘RDD執行全鏈接匯聚操做等。

言歸正傳,回到compute方法自己。

怎麼得到Iterator[T],對ShuffleRDD來講是從BlockManager獲取迭代器Iterator[T]。這種迭代器是blockResult,是ShuffleMapTask執行結果的保存格式;另外一種就是直接得到iter,這種是ResultTask的執行結果的數據。

第一種狀況,看BlockManager可否找到本RDD的partition的BlockResult。看看getOrElseUpdate方法還傳遞了一個函數做爲最後一個入參,若是不存在指定的BlockResult,則返回入參函數來計算獲得iter,方法體定義以下:

() => {
  readCachedBlock = false
  computeOrReadCheckpoint(partition, context)
}

主要就是調用computeOrReadCheckpoint方法計算分區。

def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
  if (isCheckpointedAndMaterialized) {
    firstParent[T].iterator(split, context)
  } else {
    compute(split, context)
  }
}

computeOrReadCheckpoint獲得Iterator,若是是checkpoint的那麼調用第一個父類的iterator方法獲得Iterator,這裏父類就是CheckpointRDD;不然就是調用compute方法獲得Iterator。

因此,RDD的迭代器的實際獲取分紅兩步:

首先,判斷是否存在該RDD指定partition的BlockResult,若是存在則將BlockResult做爲Iterator結果,此時表示該RDD是shuffleRDD之類。

而後若是上述不知足,則又分兩種狀況,第一種這是checkpoint的RDD,則調用父RDD的iterator方法(此時父RDD就是CheckpointRDD);不然調用compute方法來得到Iterator。

二、Stage劃分

咱們知道RDD的提交Spark集羣執行是分階段劃分Stage提交的。從最後一個Stage開始,依次循環遞歸判斷是否要調用依賴RDD的Stage,Stage的劃分是根據是否要Shuffle做爲分界點的。

若是某個RDD的依賴(dep)是ShuffleDependency,則次RDD做爲ShuffleMapTask任務提交,不然最後一個RDD做爲ResultTask提交。

遞歸提交Stage,對ShuffleMapTask類型的RDD,會一直遞歸判斷該RDD是否存在前置的ShuffleDependency,若是存在則遞歸提交前依賴RDD。

整個Spark做業是RDD串接的,若是不存在Shuffle依賴,則提交最後一個RDD,而且只有這一個RDD被提交。在計算最後一個RDD的iterator時,被調用到父RDD的iterator方法,此時父RDD通常都是MapPartitionsRDD。在MapPartitionsRDD中有進一步敘述。

三、RDD子類

RDD含有多個子類,如MapPartitionRDD,HadoopRDD、CoGroupedRDD等等。筆者這裏就找幾個例子簡單說明一下他們的內部邏輯。 

3.1 MapPartitionsRDD

MapPartitionsRDD是RDD的子類,前面看到RDD的諸多算子都會生成新的MapPartitionRDD。

MapPartitionsRDD的構造函數須要入參f,它是一個函數抽象類或者叫作泛類。

f: (TaskContext, Int, Iterator[T]) => Iterator[U]

f的入參有三個:

(1) TaskContext:是任務上下文

(2) Int:是分區編碼

(3) Iterator[T]是分區迭代器

f的輸出也是一個Iterator迭代器。能夠看出,f是一個抽象的從一個迭代器生成另外一個迭代器的自定義函數。對數據的處理邏輯就是體如今f上。

MapPartitionRDD中觸發計算的compute方法定義以下:

override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))

這裏的f是MapPartitionRDD的構造函數中傳進入的入參,是用戶自定義的map函數。這樣,經過RDD的map、flatmap等算子和MapPartitionRDD,能夠將RDD上的一系列操做不停的串聯下去。

3.2 CoalescedRDD

CoalescedRDD將M個分區的RDD從新分紅N個分區,造成新的RDD。在計算過程當中,會引發Shuffle工程。

首先CoalescedRDD須要一個從新分區算法,將M個分區如何劃分到N個分區,這裏M>N。從新分區的結果是N的每一個分區對應了M的多個分區,用List<Int>來表示,List<Int>中每一個Int表示父RDD中M個分區之一的編號。

若是CoalescedRDD沒有指定本身的從新分區算法,則用DefaultPartitionCoalescer來作從新分區計算。

CoalescedRDD的compute過程以下:

override def compute(partition: Partition, context: TaskContext): Iterator[T] = {
  partition.asInstanceOf[CoalescedRDDPartition].parents.iterator.flatMap { 
    parentPartition => firstParent[T].iterator(parentPartition, context)
  }
}

partition.parents是指CoalescedRDD的第partition分區所對應的父RDD的分區列表,對分區列表的每一個分區,執行:

firstParent[T].iterator(parentPartition, context)

而後獲得最終的Iterator[T]。這段應該不難理解。

須要留意的是,這裏獲得的Iterator[T]最終是要寫到Shuffle的,由於CoalescedRDD對應的ShuffleMapTask而不是ResultTask。

對於理解Spark計算流程來講,理解了Shuffle的過程,也就解決了一半的疑惑了。

相關文章
相關標籤/搜索