注:某些函數只有PairRDD只有,而普通的RDD則沒有,好比gropuByKey、reduceByKey、sortByKey、join、cogroup等函數要根據Key進行分組或直接操做數組
RDD基本轉換:ide |
|||
RDD[U] map(f: T => U)函數 T:原RDD中元素類型spa U:新RDD中元素類型orm |
函數將T元素轉換爲新的U元素排序 |
rdd.map(x => x + 1)索引 |
{1, 2, 3, 3}ip =>{2, 3, 4, 4}ci |
RDD[U] flatMap(f: T => TraversableOnce[U])it TraversableOnce:集合與迭代器的父類 |
函數將T元素轉換爲含有新類型U元素的集合,並將這些集合展平(兩層轉換成一層)後的元素造成新的RDD |
rdd.flatMap(x => x.to(3)) |
{1, 2, 3, 3} =>{1, 2, 3, 2, 3, 3, 3} |
RDD[T] filter(f: T => Boolean) |
函數對每一個元素進行過濾,經過的元素造成新的RDD |
rdd.filter(x => x != 1) |
{1, 2, 3, 3} =>{2, 3, 3} |
RDD[T] distinct() |
去重 |
rdd.distinct() |
{1, 2, 3, 3} =>{1, 2, 3} |
RDD[U] mapPartitions(f: Iterator[T] => Iterator[U]) |
與map同樣,只是轉換時是以分區爲單位,將一個分區全部元素包裝成Iterator一次性傳入函數進行處理,而不像map函數那樣每一個元素都會調用一個函數,即這裏有幾個分區則才調用幾回函數
假設有N個元素,有M個分區,那麼map的函數的將被調用N次,而mapPartitions被調用M次 |
val arr = Array(1, 2, 3, 4, 5) val rdd = sc.parallelize(arr, 2) rdd.mapPartitions((it: Iterator[Int]) => { var l = List[Int](); it.foreach((e: Int) => l = e * 2 :: l); l.iterator }) |
=>{2, 4, 6, 8, 10} |
RDD[U] mapPartitionsWithIndex(f: (Int, Iterator[T]) => Iterator[U]) |
與mapPartitions相似,不一樣的時函數多了個分區索引的參數 |
|
|
RDD[T] union(other: RDD[T]) |
兩個RDD 並集,包括重複的元素 |
rdd.union(otherRdd) |
{ 1, 2, 2, 3, 3} { 3, 4, 5} =>{1, 2, 2, 3, 3, 3, 4, 5} |
RDD[T] intersection(other: RDD[T]) |
兩個RDD 交集 |
rdd.intersection(otherRdd) |
{ 1, 2, 2, 3, 3} { 3, 4, 5} =>{3} |
RDD[T] subtract(other: RDD[T]) |
兩個RDD相減 |
rdd.subtract(otherRdd) |
{ 1, 2, 2, 3, 3} { 3, 4, 5} =>{1, 2, 2} |
RDD[(T, U)] cartesian(other: RDD[U]) |
兩個RDD相減笛卡兒積 |
rdd.cartesian(otherRdd) |
{ 1, 2 } { 3, 4} =>{(1,3),(1,4),(2,3),(2,4)} |
RDD[T] sortBy( f: (T) => K, ascending: Boolean,numPartitions: Int)
|
根據轉換後的值進行排序,傳入的是一個(T) => K 轉換函數 |
rdd.sortBy(_._2, false, 1) 這裏根據value進行降序排序 |
{("leo", 65), ("tom", 50), ("marry", 100), ("jack", 80)} =>{("marry", 100),("jack", 80),("leo", 65), ("leo", 65)}
|
RDD[Array[T]] glom() |
將RDD的每一個分區中的類型爲T的元素轉換換數組Array[T] |
|
val arr = Array(1, 2, 3, 4, 5) val rdd = sc.parallelize(arr, 2) val arrRDD = rdd.glom()arrRDD.foreach { (arr: Array[Int]) => { println("[ " + arr.mkString(" ") + " ]"); } } =>[ 1 2 ], [ 3 4 5 ] |
|
|
|
|
鍵-值RDD轉換: |
|||
RDD[(K, U)] mapValues[U](f: V => U) K:key類型 V:value類型 |
將value轉換爲新的U元素,Key不變 |
rdd.mapValues(_ + 1) |
{"class1", 80), ("class2", 70)} =>{"class1", 81), ("class2", 71)}
|
RDD[(K, U)] flatMapValues(f: V => TraversableOnce[U]) |
對[K,V]型數據中的V值flatmap操做 |
rdd.flatMapValues(_.toCharArray()) |
{ (1, "ab"), (2, "bc")} =>{(1, 'a'), (1, 'b'), (2, 'b'), (2, 'c')} |
RDD[(K, Iterable[V])] groupByKey()
|
根據key進行分組,同一組的元素組成Iterable<V>,並以(key, Iterable<V>)元組類型爲元素做爲新的RDD返回 |
rdd.groupByKey() |
{("class1", 80), ("class2", 75), ("class1", 90), ("class2", 60)} =>{("class1",[80,90]),("class2",[75,60])}
|
RDD[(K, Iterable[T])] groupBy(f: T => K) T:原RDD元素類型 K:新RDD中元素Key的類型 |
根據函數將元素T映射成相應K後,以此K進行分組 |
rdd.groupBy({ case 1 => 1; case 2 => 2; case "二" => 2 }) |
{ 1, 2, "二" } =>{(1,[1]),(2,[2, "二"])} |
RDD[(K, V)] reduceByKey(func: (V, V) => V) |
先根據key進行分組,再對同一組中的的value進行reduce操做:第一次調用函數時傳入的是兩個Key所對應的value,從第二次日後,傳入的兩個參數中的第一個爲上次函數計算的結果,第二個參數爲其它Key的value |
rdd. reduceByKey(_ + _) |
{("class1", 80), ("class2", 75), ("class1", 90), ("class2", 60)} =>{("class1", 170),("class2", 135)} |
RDD[(K, V)] sortByKey() |
根據key的大小進行排序(注:並非先以Key進行分組,再對組類進行排序,而是直接根據Key的值進行排序) |
rdd.sortByKey(false) |
{(65, "leo"), (50, "tom"),(100, "marry"), (85, "jack")} =>{(100, "marry"),(85, "jack"),(65, "eo"),(50, "tom")} |
|
|
|
|
RDD[(K, V)] foldByKey(zeroValue: V)(func: (V, V) => V):
zeroValue:每一個分區相同Key累計時的初始值,以及不一樣分區相同Key合併時的初始值 e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication |
對每一個value先進行func操做,且funcfoldByKey函數是經過調用comineByKey函數實現的。 zeroVale:對V進行初始化,其實是經過CombineByKey的createCombiner實現的 V => (zeroValue,V),再經過func函數映射成新的值,即func(zeroValue,V)
func: Value將經過func函數按Key值進行合併(其實是經過CombineByKey的mergeValue,mergeCombiners函數實現的,只不過在這裏,這兩個函數是相同的) |
val people = List(("Mobin", 1), ("Lucy", 2), ("Amy", 3), ("Amy", 4), ("Lucy", 5)) val rdd = sc.parallelize(people,2) val foldByKeyRDD = rdd.foldByKey(10)((v1, v2) => { println(v1 + " + " + v2 + " = " + (v1 + v2)); v1 + v2 }) //先對每一個V都加10,再對相同Key的value值相加 foldByKeyRDD.foreach(println) |
//處理第一個分區數據 10 + 1 = 11 // ("Mobin", 1) 10 + 2 = 12 // ("Lucy", 2) ===================== //處理第二個分區數據 10 + 3 = 13 // ("Amy", 3) 13 + 4 = 17 // ("Amy", 4)同分區同Key的Val先合併 10 + 5 = 15 // ("Lucy", 5) ===================== //將不一樣分區相同Key的Value合併起來 12 + 15 = 27 // "Lucy"跨分區,因此需合併 (Amy,17) (Mobin,11) (Lucy,27) |
RDD[(K, (V, Option[W]))] leftOuterJoin[W](other: RDD[(K, W)]): |
左外鏈接,包含左RDD的全部數據,若是右邊沒有與之匹配的用None表示 |
val arr = List(("A", 1), ("A", 2), ("B", 1)) val arr1 = List(("A", "A1"), ("A", "A2")) val rdd = sc.parallelize(arr, 2) val rdd1=sc.parallelize(arr1, 2) val leftOutJoinRDD = rdd.leftOuterJoin(rdd1) leftOutJoinRDD.foreach(println) |
=> (B,(1,None)) (A,(1,Some(A1))) (A,(1,Some(A2))) (A,(2,Some(A1))) (A,(2,Some(A2))) |
RDD[(K, (Option[V], W))] rightOuterJoin[W](other: RDD[(K, W)]) |
右外鏈接,包含右RDD的全部數據,若是左邊沒有與之匹配的用None表示 |
val arr = List(("A", 1), ("A", 2)) val arr1 = List(("A", "A1"), ("A", "A2"), ("B", 1)) val rdd = sc.parallelize(arr, 2) val rdd1 = sc.parallelize(arr1, 2) val leftOutJoinRDD = rdd.rightOuterJoin(rdd1) leftOutJoinRDD.foreach(println) |
(B,(None,1)) (A,(Some(1),A1)) (A,(Some(1),A2)) (A,(Some(2),A1)) (A,(Some(2),A2)) |
RDD[(K, (V, W))] join(other: RDD[(K, W)) W:另外一RDD元素的value的類型 |
對兩個包含<key,value>對的RDD根據key進行join操做,返回類型<key,Tuple2(key,value)> |
rdd.join(otherRdd) |
{(1, "leo"),(2, "jack"),(3, "tom")} {(1, 100), (2, 90), (3, 60), (1, 70), (2, 80), (3, 50)} =>{(1,("leo",100)),(1,("leo",70)),(2, ("jack",90),(2, ("jack",80),(3, ("tom",60),(3, ("tom",50))} |
RDD[(K, (Iterable[V], Iterable[W]))] cogroup(other: RDD[(K, W)]) |
同join,也是根據key進行join,只不過相同key的value分別存放到Iterable<value>中 |
rdd.cogroup(otherRdd) |
{(1, "leo"),(2, "jack"),(3, "tom")} {(1, 100), (2, 90), (3, 60), (1, 70), (2, 80), (3, 50)} =>{(1,(["leo"],[100,70])),(2, (["jack"],[90,80])),(3, (["tom","lily"],[60,50]))} |
|
|
|
|
T reduce(f: (T, T) => T) |
對全部元素進行reduce操做 |
rdd.reduce(_ + _) |
{1, 2, 2, 3, 3, 3} =>14 |
Array[T] collect() |
將RDD中全部元素返回到一個數組裏 注意:This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver's memory. |
rdd.collect() |
{1, 2, 3, 3} =>[1, 2, 3, 3] |
Map[K, V] collectAsMap() |
做用於K-V類型的RDD上,做用與collect不一樣的是collectAsMap函數不包含重複的key,對於重複的key,後面的元素覆蓋前面的元素 |
rdd.collectAsMap() |
{ ("leo", 65), ("tom", 50), ("tom", 100)} =>{ ("leo", 65), ("tom", 100)} |
Long count() |
統計RDD 中的元素個數 |
rdd.count() |
{1, 2, 3, 3} =>4 |
Map[T, Long] countByValue() |
各元素在 RDD 中出現的次數 注意:This method should only be used if the resulting map is expected to be small, as the whole thing is loaded into the driver's memory. To handle very large results, consider using rdd.map(x => (x, 1L)).reduceByKey(_ + _), which returns an RDD[T, Long] instead of a map. |
rdd.countByValue() |
{1, 2, 3, 3} =>Map(1 -> 1, 3 -> 2, 2 -> 1) |
Map[K, Long] countByKey() |
先根據Key進行分組,再對每組裏的value分別進行計數統計 注意:This method should only be used if the resulting map is expected to be small, as the whole thing is loaded into the driver's memory.
To handle very large results, consider using rdd.mapValues(_ => 1L).reduceByKey(_ + _), which returns an RDD[T, Long] instead of a map. |
|
{ ("leo", 65), ("tom", 50), ("tom", 100), ("tom", 100) } =>Map(leo -> 1, tom -> 3) |
T first() |
取第一個元素,實質上是調用take(1)實現的 |
rdd.first() |
{3, 2, 1, 4} =>3 |
Array[T] take(num: Int) |
從 RDD 中返回前 num 個元素 注意:This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver's memory. |
rdd.take(2)
|
{3, 2, 1, 4} =>[3, 2] |
Array[T] top(num: Int ) (implicit ord: Ordering[T])
若是沒有傳遞 ord參數,則使用隱式參數,且提供的默認隱式參數爲升序排序,能夠傳遞一個自定義的Ordering來覆蓋默認提供。 top實現是將Ordering反序後再調用 takeOrdered的:takeOrdered(num)(ord.reverse) |
默認從 RDD 中返回最最大的 num個元素 注意:This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver's memory. |
rdd.top(2) |
{3, 2, 1, 4} =>[4, 3] |
Array[T] takeOrdered(num: Int)(implicit ord: Ordering[T])
若是沒有傳遞 ord參數,則使用隱式參數,且提供的默認隱式參數爲升序排序,能夠傳遞一個自定義的Ordering來覆蓋默認提供 |
與top相反,默認取的是前面最小的num個元素 注意:This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver's memory. |
rdd.takeOrdered(2)(myOrdering) |
{3, 2, 1, 4} =>[1, 2] |
T fold(zeroValue: T)(op: (T, T) => T)
zeroValue:爲每一個分區累計的初始值,以及不一樣分區累計的初始值 e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication |
和 reduce() 一 樣, 但 是 需 要 提供初始值。注意:每一個分區應用op函數時,都會以zeroValue爲初始值進行計算,而後將每一個分區的結果合併時,仍是會以zeroValue爲初始值進行合併計算 |
val arr = Array(1, 2, 3, 4, 5); val rdd = sc.parallelize(arr, 2) //分紅兩分區[1, 2] [3, 4, 5] println(rdd.fold(10)((v1, v2) => { println(v1 + " + " + v2 + " = " + (v1 + v2)); v1 + v2 })) |
//處理第一個分區數據 10 + 1 = 11 11 + 2 = 13 //從第二個元素起,每分區內先累加 ===================== //處理第一個分區數據 10 + 3 = 13 13 + 4 = 17 //從第二個元素起,每分區內先累加 17 + 5 = 22 //從第二個元素起,每分區內先累加 ===================== //將各分區彙總起來 10 + 13 = 23 // 彙總時還會使用初始值來做起始 23 + 22 = 45 45 |
|
|
|
|
U aggregate (zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U)
初始值類型與原始數據類型能夠不一樣,但初始值類型決定了返回值類型 |
與fold同樣,計算時須要提供初始值,不一樣的是,分區的計算函數(seqOp)與分區合併計算函數(combOp)是不一樣的,但fold分區計算函數與分區合併計算函數是同一函數 |
rdd.fold(5)(_ + _, _ + _) |
val arr = Array(1, 2, 3, 4); val rdd = sc.parallelize(arr, 2) println(rdd.aggregate(5)( (v1, v2) => { println("v1 = " + v1 + " ; v2 = " + v2); v1 + v2 }, (v1, v2) => { println("v1 = " + v1 + " ; v2 = " + v2); v1 + v2 }) ) 過程與結果與上面的fold函數同樣 |
Unit saveAsTextFile(path: String) |
將RDD元素保存到文件中,對每一個元素調用toString方法 |
|
|
Unit foreach(f: T => Unit) |
遍歷RDD中的每一個元素 |
rdd.foreach(println(_)) |
無
|
|
|
|
|
|
|
|
|
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)]
createCombiner:在第一次遇到Key時建立組合器函數,將RDD數據集中的V類型值轉換C類型值(V => C),
mergeValue:合併值函數,再次遇到相同的Key時,將createCombiner道理的C類型值與此次傳入的V類型值合併成一個C類型值(C,V)=>C
mergeCombiners:合併組合器函數,將C類型值兩兩合併成一個C類型值
partitioner:使用已有的或自定義的分區函數,默認是HashPartitioner
mapSideCombine:是否在map端進行Combine操做,默認爲true
例:統計男性和女生的個數,並以(性別,(名字,名字....),個數)的形式輸出
object CombineByKey {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local").setAppName("combinByKey")
val sc = new SparkContext(conf)
val people = List(("male", "Mobin"), ("male", "Kpop"), ("female", "Lucy"), ("male", "Lufei"), ("female", "Amy"))
val rdd = sc.parallelize(people)
val combinByKeyRDD = rdd.combineByKey(
(x: String) => (List(x), 1),
(peo: (List[String], Int), x: String) => (x :: peo._1, peo._2 + 1),
(sex1: (List[String], Int), sex2: (List[String], Int)) => (sex1._1 ::: sex2._1, sex1._2 + sex2._2))
combinByKeyRDD.foreach(println)
sc.stop()
}
}
輸出:
(male,(List(Lufei, Kpop, Mobin),3))
(female,(List(Amy, Lucy),2))
計算過程:
Partition1:
K="male" --> ("male","Mobin") --> createCombiner("Mobin") => peo1 = ( List("Mobin") , 1 )
K="male" --> ("male","Kpop") --> mergeValue(peo1,"Kpop") => peo2 = ( "Kpop" :: peo1_1 , 1 + 1 ) //Key相同調用mergeValue函數對值進行合併
K="female" --> ("female","Lucy") --> createCombiner("Lucy") => peo3 = ( List("Lucy") , 1 )
Partition2:
K="male" --> ("male","Lufei") --> createCombiner("Lufei") => peo4 = ( List("Lufei") , 1 )
K="female" --> ("female","Amy") --> createCombiner("Amy") => peo5 = ( List("Amy") , 1 )
Merger Partition:
K="male" --> mergeCombiners(peo2,peo4) => (List(Lufei,Kpop,Mobin))
K="female" --> mergeCombiners(peo3,peo5) => (List(Amy,Lucy))