spark的運算操做有兩種類型:分別是Transformation和Action,區別以下:mysql
Transformation:表明的是轉化操做就是咱們的計算流程,返回是RDD[T],能夠是一個鏈式的轉化,而且是延遲觸發的。sql
Action:表明是一個具體的行爲,返回的值非RDD類型,能夠一個object,或者是一個數值,也能夠爲Unit表明無返回值,而且action會當即觸發job的執行。api
Transformation的官方文檔方法集合以下:bash
map filter flatMap mapPartitions mapPartitionsWithIndex sample union intersection distinct groupByKey reduceByKey aggregateByKey sortByKey join cogroup cartesian pipe coalesce repartition repartitionAndSortWithinPartitions
Action的官方文檔方法集合以下:微信
reduce collect count first take takeSample takeOrdered saveAsTextFile saveAsSequenceFile saveAsObjectFile countByKey foreach
結合平常開發好比經常使用的count,collect,saveAsTextFile他們都是屬於action類型,結果值要麼是空,要麼是一個數值,或者是object對象。其餘的如map,filter返回值都是RDD類型的,因此簡單的區分兩個不一樣之處,就能夠用返回值是否是RDD[T]類型來辨別。
接着回到正題,咱們說下foreachPartition和mapPartitions的分別,細心的朋友可能會發現foreachPartition並無出如今上面的方法列表中,緣由多是官方文檔並只是列舉了經常使用的處理方法,不過這並不影響咱們的使用,首先咱們按照上面的區分原則來看下foreachPartition應該屬於那種操做,官網文檔的這個方法api以下:this
public void foreachPartition(scala.Function1<scala.collection.Iterator<T>,scala.runtime.BoxedUnit> f) Applies a function f to each partition of this RDD. Parameters: f - (undocumented)
從上面的返回值是空能夠看出foreachPartition應該屬於action運算操做,而mapPartitions是在Transformation中,因此是轉化操做,此外在應用場景上區別是mapPartitions能夠獲取返回值,繼續在返回RDD上作其餘的操做,而foreachPartition由於沒有返回值而且是action操做,因此使用它通常都是在程序末尾好比說要落地數據到存儲系統中如mysql,es,或者hbase中,能夠用它。spa
固然在Transformation中也能夠落地數據,可是它必須依賴action操做來觸發它,由於Transformation操做是延遲執行的,若是沒有任何action方法來觸發,那麼Transformation操做是不會被執行的,這一點須要注意scala
一個foreachPartition例子:code
def main(args: Array[String]): Unit = { val sp = new SparkConf(); sp.setAppName("zhangzeli") sp.setMaster("local") val sc = new SparkContext(sp); val rdd =sc.parallelize(Seq(1,2,3,4,5,6),3); rdd.foreachPartition(p=>{ p.foreach(line=>{ // partiton.size 不能執行這個方法,不然下面的foreach方法裏面會沒有數據, //由於iterator只能被執行一次 println(line) }) }); while (true){} }
一個mapPartitions例子:orm
val sparkConf=new SparkConf() val sc=new SparkContext(sparkConf) sparkConf.setAppName("spark demo example ") val rdd=sc.parallelize(Seq(1,2,3,4,5),3) rdd.mapPartitions(partiton=>{ //只能用map,不能用foreach,由於foreach沒有返回值 partiton.map(line=>{ //save line } ) }) rdd.count()//須要action,來觸發執行 sc.stop()
最後,須要注意一點,若是操做是iterator類型,咱們是不能在循環外打印這個iterator的size,一旦執行size方法,至關於iterato就會被執行,因此後續的foreach你會發現是空值的,切記iterator迭代器只能被執行一次。
個人微信