Spark:foreach和foreachpartition的區別

1、RDD基礎

1.RDD分佈式數據集的五大特性

(1)A list of partitions數據庫

(2)A function for computing each split緩存

(3)A list of dependencies on other RDDs微信

(4)Optionally,a Partitioner for key-value RDDs網絡

(5)Optionally,a list of preferred locations to compute each split閉包

2.RDD的操做類型

 Transformations:轉換,lazy型,不會觸發計算分佈式

 Action:觸發jobide

 Persist:緩存也不會觸發job,在第一次觸發job以後纔會真正進行緩存函數

3.RDD的計算

RDD的計算實際上咱們能夠分爲兩大部分。性能

1)Driver端的計算this

主要是stage劃分,task的封裝,task調度執行

2)Executor端的計算

真正的計算開始,默認狀況下每一個cpu運行一個task。一個task實際上就是一個分區,咱們的方法不管是轉換算子裏封裝的,仍是action算子裏封裝的都是此時在一個task裏面計算一個分區的數據。

2、源碼相關

1.第一次封裝

/**
   * Applies a function f to all elements of this RDD.
   */
  def foreach(f: T => Unit): Unit = withScope {
    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
  }

  /**
   * Applies a function f to each partition of this RDD.
   */
  def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
  }

能夠看到方法經過clean操做(清理閉包,爲序列化和網絡傳輸作準備),進行了一次匿名函數的封裝, 針對foreach方法,是咱們的方法被傳入了迭代器foreach(每一個元素遍歷執行一次函數), 而對於foreachpartition方法是迭代器被傳入了咱們的方法(每一個分區執行一次函數,咱們獲取迭代器後須要自行進行迭代處理)

2.第二次封裝

def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: Iterator[T] => U,
      partitions: Seq[Int]): Array[U] = {
    val cleanedFunc = clean(func)
    runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
  }

就是講上述封裝的方法進一步按照匿名函數封裝

(ctx:TaskContext,it:Iterator[T] => cleanFunc(it))

3.執行的時候

Spark的Task類型咱們用到的也就兩個

1)shuffleMapTask

2)ResultTask

Action算子的方法是在ResultTask中執行的,也即ResultTask的runTask方法。

首先反序列化獲得咱們的方法和RDD,而後執行。傳入的是迭代器

override def runTask(context: TaskContext): U = {
    // Deserialize the RDD and the func using the broadcast variables.
    val threadMXBean = ManagementFactory.getThreadMXBean
    val deserializeStartTime = System.currentTimeMillis()
    val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime
    } else 0L
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
    _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
    } else 0L

    func(context, rdd.iterator(partition, context))
  }

3、總結

RDD.foreach(foreachFunction)

RDD.foreachPatition(foreachPartitionFunction)

通過第二部分析咱們能夠理解,展開以後實際上就是

RDD的每一個分區的iterator:

iterator.foreach(foreachFunction)

foreachPartitionFunction(iterator)

這就很明顯了,假如咱們的Function中有數據庫,網絡TCP等IO鏈接,文件流等等的建立關閉操做,採用foreachPartition方法,針對每一個分區集合進行計算,更能提升咱們的性能。

張澤立微信

相關文章
相關標籤/搜索