Spark中foreachPartition和mapPartitions的區別

spark的運算操做有兩種類型:分別是Transformation和Action,區別以下:mysql

Transformation:表明的是轉化操做就是咱們的計算流程,返回是RDD[T],能夠是一個鏈式的轉化,而且是延遲觸發的。sql

Action:表明是一個具體的行爲,返回的值非RDD類型,能夠一個object,或者是一個數值,也能夠爲Unit表明無返回值,而且action會當即觸發job的執行。api

Transformation的官方文檔方法集合以下:bash

map
filter
flatMap
mapPartitions
mapPartitionsWithIndex
sample
union
intersection
distinct
groupByKey
reduceByKey
aggregateByKey
sortByKey
join
cogroup
cartesian
pipe
coalesce
repartition
repartitionAndSortWithinPartitions

Action的官方文檔方法集合以下:微信

reduce
collect
count
first
take
takeSample
takeOrdered
saveAsTextFile
saveAsSequenceFile
saveAsObjectFile
countByKey
foreach

結合平常開發好比經常使用的count,collect,saveAsTextFile他們都是屬於action類型,結果值要麼是空,要麼是一個數值,或者是object對象。其餘的如map,filter返回值都是RDD類型的,因此簡單的區分兩個不一樣之處,就能夠用返回值是否是RDD[T]類型來辨別。
接着回到正題,咱們說下foreachPartition和mapPartitions的分別,細心的朋友可能會發現foreachPartition並無出如今上面的方法列表中,緣由多是官方文檔並只是列舉了經常使用的處理方法,不過這並不影響咱們的使用,首先咱們按照上面的區分原則來看下foreachPartition應該屬於那種操做,官網文檔的這個方法api以下:this

public void foreachPartition(scala.Function1<scala.collection.Iterator<T>,scala.runtime.BoxedUnit> f)
 
Applies a function f to each partition of this RDD.
 
Parameters:
f - (undocumented)

從上面的返回值是空能夠看出foreachPartition應該屬於action運算操做,而mapPartitions是在Transformation中,因此是轉化操做,此外在應用場景上區別是mapPartitions能夠獲取返回值,繼續在返回RDD上作其餘的操做,而foreachPartition由於沒有返回值而且是action操做,因此使用它通常都是在程序末尾好比說要落地數據到存儲系統中如mysql,es,或者hbase中,能夠用它。spa

固然在Transformation中也能夠落地數據,可是它必須依賴action操做來觸發它,由於Transformation操做是延遲執行的,若是沒有任何action方法來觸發,那麼Transformation操做是不會被執行的,這一點須要注意scala

一個foreachPartition例子:code

def main(args: Array[String]): Unit = {
    val sp = new SparkConf();
    sp.setAppName("zhangzeli")
    sp.setMaster("local")
    val sc = new SparkContext(sp);
    val rdd =sc.parallelize(Seq(1,2,3,4,5,6),3);
    rdd.foreachPartition(p=>{
      p.foreach(line=>{
        // partiton.size 不能執行這個方法,不然下面的foreach方法裏面會沒有數據,
        //由於iterator只能被執行一次
        println(line)
      })
    });
    while (true){}

  }

一個mapPartitions例子:orm

val sparkConf=new SparkConf()
     val sc=new SparkContext(sparkConf)
      sparkConf.setAppName("spark demo example ")
    val rdd=sc.parallelize(Seq(1,2,3,4,5),3)
 
    rdd.mapPartitions(partiton=>{
      //只能用map,不能用foreach,由於foreach沒有返回值
      partiton.map(line=>{
        //save line
      }
      )
    })
 
    rdd.count()//須要action,來觸發執行
    sc.stop()

最後,須要注意一點,若是操做是iterator類型,咱們是不能在循環外打印這個iterator的size,一旦執行size方法,至關於iterato就會被執行,因此後續的foreach你會發現是空值的,切記iterator迭代器只能被執行一次。

個人微信張澤立,澤立,澤,微信

相關文章
相關標籤/搜索