Spark 系列(四)—— RDD經常使用算子詳解

1、Transformation

spark 經常使用的 Transformation 算子以下表:html

Transformation 算子 Meaning(含義)
map(func) 對原 RDD 中每一個元素運用 func 函數,並生成新的 RDD
filter(func) 對原 RDD 中每一個元素使用func 函數進行過濾,並生成新的 RDD
flatMap(func) 與 map 相似,可是每個輸入的 item 被映射成 0 個或多個輸出的 items( func 返回類型須要爲 Seq )。
mapPartitions(func) 與 map 相似,但函數單獨在 RDD 的每一個分區上運行, func函數的類型爲 Iterator<T> => Iterator<U> ,其中 T 是 RDD 的類型,即 RDD[T]
mapPartitionsWithIndex(func) 與 mapPartitions 相似,但 func 類型爲 (Int, Iterator<T>) => Iterator<U> ,其中第一個參數爲分區索引
sample(withReplacement, fraction, seed) 數據採樣,有三個可選參數:設置是否放回(withReplacement)、採樣的百分比(fraction)、隨機數生成器的種子(seed);
union(otherDataset) 合併兩個 RDD
intersection(otherDataset) 求兩個 RDD 的交集
distinct([numTasks])) 去重
groupByKey([numTasks]) 按照 key 值進行分區,即在一個 (K, V) 對的 dataset 上調用時,返回一個 (K, Iterable<V>)
Note: 若是分組是爲了在每個 key 上執行聚合操做(例如,sum 或 average),此時使用 reduceByKeyaggregateByKey 性能會更好
Note: 默認狀況下,並行度取決於父 RDD 的分區數。能夠傳入 numTasks 參數進行修改。
reduceByKey(func, [numTasks]) 按照 key 值進行分組,並對分組後的數據執行歸約操做。
aggregateByKey(zeroValue,numPartitions)(seqOp, combOp, [numTasks]) 當調用(K,V)對的數據集時,返回(K,U)對的數據集,其中使用給定的組合函數和 zeroValue 聚合每一個鍵的值。與 groupByKey 相似,reduce 任務的數量可經過第二個參數進行配置。
sortByKey([ascending], [numTasks]) 按照 key 進行排序,其中的 key 須要實現 Ordered 特質,便可比較
join(otherDataset, [numTasks]) 在一個 (K, V) 和 (K, W) 類型的 dataset 上調用時,返回一個 (K, (V, W)) pairs 的 dataset,等價於內鏈接操做。若是想要執行外鏈接,可使用 leftOuterJoin, rightOuterJoinfullOuterJoin 等算子。
cogroup(otherDataset, [numTasks]) 在一個 (K, V) 對的 dataset 上調用時,返回一個 (K, (Iterable<V>, Iterable<W>)) tuples 的 dataset。
cartesian(otherDataset) 在一個 T 和 U 類型的 dataset 上調用時,返回一個 (T, U) 類型的 dataset(即笛卡爾積)。
coalesce(numPartitions) 將 RDD 中的分區數減小爲 numPartitions。
repartition(numPartitions) 隨機從新調整 RDD 中的數據以建立更多或更少的分區,並在它們之間進行平衡。
repartitionAndSortWithinPartitions(partitioner) 根據給定的 partitioner(分區器)對 RDD 進行從新分區,並對分區中的數據按照 key 值進行排序。這比調用 repartition 而後再 sorting(排序)效率更高,由於它能夠將排序過程推送到 shuffle 操做所在的機器。

下面分別給出這些算子的基本使用示例:java

1.1 map

val list = List(1,2,3)
sc.parallelize(list).map(_ * 10).foreach(println)

// 輸出結果: 10 20 30 (這裏爲了節省篇幅去掉了換行,後文亦同)
複製代碼

1.2 filter

val list = List(3, 6, 9, 10, 12, 21)
sc.parallelize(list).filter(_ >= 10).foreach(println)

// 輸出: 10 12 21
複製代碼

1.3 flatMap

flatMap(func)map 相似,但每個輸入的 item 會被映射成 0 個或多個輸出的 items( func 返回類型須要爲 Seq)。git

val list = List(List(1, 2), List(3), List(), List(4, 5))
sc.parallelize(list).flatMap(_.toList).map(_ * 10).foreach(println)

// 輸出結果 : 10 20 30 40 50
複製代碼

flatMap 這個算子在日誌分析中使用機率很是高,這裏進行一下演示:拆分輸入的每行數據爲單個單詞,並賦值爲 1,表明出現一次,以後按照單詞分組並統計其出現總次數,代碼以下:github

val lines = List("spark flume spark",
                 "hadoop flume hive")
sc.parallelize(lines).flatMap(line => line.split(" ")).
map(word=>(word,1)).reduceByKey(_+_).foreach(println)

// 輸出:
(spark,2)
(hive,1)
(hadoop,1)
(flume,2)
複製代碼

1.4 mapPartitions

與 map 相似,但函數單獨在 RDD 的每一個分區上運行, func函數的類型爲 Iterator<T> => Iterator<U> (其中 T 是 RDD 的類型),即輸入和輸出都必須是可迭代類型。apache

val list = List(1, 2, 3, 4, 5, 6)
sc.parallelize(list, 3).mapPartitions(iterator => {
  val buffer = new ListBuffer[Int]
  while (iterator.hasNext) {
    buffer.append(iterator.next() * 100)
  }
  buffer.toIterator
}).foreach(println)
//輸出結果
100 200 300 400 500 600
複製代碼

1.5 mapPartitionsWithIndex

與 mapPartitions 相似,但 func 類型爲 (Int, Iterator<T>) => Iterator<U> ,其中第一個參數爲分區索引。數組

val list = List(1, 2, 3, 4, 5, 6)
sc.parallelize(list, 3).mapPartitionsWithIndex((index, iterator) => {
  val buffer = new ListBuffer[String]
  while (iterator.hasNext) {
    buffer.append(index + "分區:" + iterator.next() * 100)
  }
  buffer.toIterator
}).foreach(println)
//輸出
0 分區:100
0 分區:200
1 分區:300
1 分區:400
2 分區:500
2 分區:600
複製代碼

1.6 sample

數據採樣。有三個可選參數:設置是否放回 (withReplacement)、採樣的百分比 (fraction)、隨機數生成器的種子 (seed) :app

val list = List(1, 2, 3, 4, 5, 6)
sc.parallelize(list).sample(withReplacement = false, fraction = 0.5).foreach(println)
複製代碼

1.7 union

合併兩個 RDD:ide

val list1 = List(1, 2, 3)
val list2 = List(4, 5, 6)
sc.parallelize(list1).union(sc.parallelize(list2)).foreach(println)
// 輸出: 1 2 3 4 5 6
複製代碼

1.8 intersection

求兩個 RDD 的交集:函數

val list1 = List(1, 2, 3, 4, 5)
val list2 = List(4, 5, 6)
sc.parallelize(list1).intersection(sc.parallelize(list2)).foreach(println)
// 輸出: 4 5
複製代碼

1.9 distinct

去重:oop

val list = List(1, 2, 2, 4, 4)
sc.parallelize(list).distinct().foreach(println)
// 輸出: 4 1 2
複製代碼

1.10 groupByKey

按照鍵進行分組:

val list = List(("hadoop", 2), ("spark", 3), ("spark", 5), ("storm", 6), ("hadoop", 2))
sc.parallelize(list).groupByKey().map(x => (x._1, x._2.toList)).foreach(println)

//輸出:
(spark,List(3, 5))
(hadoop,List(2, 2))
(storm,List(6))
複製代碼

1.11 reduceByKey

按照鍵進行歸約操做:

val list = List(("hadoop", 2), ("spark", 3), ("spark", 5), ("storm", 6), ("hadoop", 2))
sc.parallelize(list).reduceByKey(_ + _).foreach(println)

//輸出
(spark,8)
(hadoop,4)
(storm,6)
複製代碼

1.12 sortBy & sortByKey

按照鍵進行排序:

val list01 = List((100, "hadoop"), (90, "spark"), (120, "storm"))
sc.parallelize(list01).sortByKey(ascending = false).foreach(println)
// 輸出
(120,storm)
(90,spark)
(100,hadoop)
複製代碼

按照指定元素進行排序:

val list02 = List(("hadoop",100), ("spark",90), ("storm",120))
sc.parallelize(list02).sortBy(x=>x._2,ascending=false).foreach(println)
// 輸出
(storm,120)
(hadoop,100)
(spark,90)
複製代碼

1.13 join

在一個 (K, V) 和 (K, W) 類型的 Dataset 上調用時,返回一個 (K, (V, W)) 的 Dataset,等價於內鏈接操做。若是想要執行外鏈接,可使用 leftOuterJoin, rightOuterJoinfullOuterJoin 等算子。

val list01 = List((1, "student01"), (2, "student02"), (3, "student03"))
val list02 = List((1, "teacher01"), (2, "teacher02"), (3, "teacher03"))
sc.parallelize(list01).join(sc.parallelize(list02)).foreach(println)

// 輸出
(1,(student01,teacher01))
(3,(student03,teacher03))
(2,(student02,teacher02))
複製代碼

1.14 cogroup

在一個 (K, V) 對的 Dataset 上調用時,返回多個類型爲 (K, (Iterable<V>, Iterable<W>)) 的元組所組成的 Dataset。

val list01 = List((1, "a"),(1, "a"), (2, "b"), (3, "e"))
val list02 = List((1, "A"), (2, "B"), (3, "E"))
val list03 = List((1, "[ab]"), (2, "[bB]"), (3, "eE"),(3, "eE"))
sc.parallelize(list01).cogroup(sc.parallelize(list02),sc.parallelize(list03)).foreach(println)

// 輸出: 同一個 RDD 中的元素先按照 key 進行分組,而後再對不一樣 RDD 中的元素按照 key 進行分組
(1,(CompactBuffer(a, a),CompactBuffer(A),CompactBuffer([ab])))
(3,(CompactBuffer(e),CompactBuffer(E),CompactBuffer(eE, eE)))
(2,(CompactBuffer(b),CompactBuffer(B),CompactBuffer([bB])))

複製代碼

1.15 cartesian

計算笛卡爾積:

val list1 = List("A", "B", "C")
val list2 = List(1, 2, 3)
sc.parallelize(list1).cartesian(sc.parallelize(list2)).foreach(println)

//輸出笛卡爾積
(A,1)
(A,2)
(A,3)
(B,1)
(B,2)
(B,3)
(C,1)
(C,2)
(C,3)
複製代碼

1.16 aggregateByKey

當調用(K,V)對的數據集時,返回(K,U)對的數據集,其中使用給定的組合函數和 zeroValue 聚合每一個鍵的值。與 groupByKey 相似,reduce 任務的數量可經過第二個參數 numPartitions 進行配置。示例以下:

// 爲了清晰,如下全部參數均使用具名傳參
val list = List(("hadoop", 3), ("hadoop", 2), ("spark", 4), ("spark", 3), ("storm", 6), ("storm", 8))
sc.parallelize(list,numSlices = 2).aggregateByKey(zeroValue = 0,numPartitions = 3)(
      seqOp = math.max(_, _),
      combOp = _ + _
    ).collect.foreach(println)
//輸出結果:
(hadoop,3)
(storm,8)
(spark,7)
複製代碼

這裏使用了 numSlices = 2 指定 aggregateByKey 父操做 parallelize 的分區數量爲 2,其執行流程以下:

https://github.com/heibaiying

基於一樣的執行流程,若是 numSlices = 1,則意味着只有輸入一個分區,則其最後一步 combOp 至關因而無效的,執行結果爲:

(hadoop,3)
(storm,8)
(spark,4)
複製代碼

一樣的,若是每一個單詞對一個分區,即 numSlices = 6,此時至關於求和操做,執行結果爲:

(hadoop,5)
(storm,14)
(spark,7)
複製代碼

aggregateByKey(zeroValue = 0,numPartitions = 3) 的第二個參數 numPartitions 決定的是輸出 RDD 的分區數量,想要驗證這個問題,能夠對上面代碼進行改寫,使用 getNumPartitions 方法獲取分區數量:

sc.parallelize(list,numSlices = 6).aggregateByKey(zeroValue = 0,numPartitions = 3)(
  seqOp = math.max(_, _),
  combOp = _ + _
).getNumPartitions
複製代碼

https://github.com/heibaiying

2、Action

Spark 經常使用的 Action 算子以下:

Action(動做) Meaning(含義)
reduce(func) 使用函數func執行歸約操做
collect() 以一個 array 數組的形式返回 dataset 的全部元素,適用於小結果集。
count() 返回 dataset 中元素的個數。
first() 返回 dataset 中的第一個元素,等價於 take(1)。
take(n) 將數據集中的前 n 個元素做爲一個 array 數組返回。
takeSample(withReplacement, num, [seed]) 對一個 dataset 進行隨機抽樣
takeOrdered(n, [ordering]) 按天然順序(natural order)或自定義比較器(custom comparator)排序後返回前 n 個元素。只適用於小結果集,由於全部數據都會被加載到驅動程序的內存中進行排序。
saveAsTextFile(path) 將 dataset 中的元素以文本文件的形式寫入本地文件系統、HDFS 或其它 Hadoop 支持的文件系統中。Spark 將對每一個元素調用 toString 方法,將元素轉換爲文本文件中的一行記錄。
saveAsSequenceFile(path) 將 dataset 中的元素以 Hadoop SequenceFile 的形式寫入到本地文件系統、HDFS 或其它 Hadoop 支持的文件系統中。該操做要求 RDD 中的元素須要實現 Hadoop 的 Writable 接口。對於 Scala 語言而言,它能夠將 Spark 中的基本數據類型自動隱式轉換爲對應 Writable 類型。(目前僅支持 Java and Scala)
saveAsObjectFile(path) 使用 Java 序列化後存儲,可使用 SparkContext.objectFile() 進行加載。(目前僅支持 Java and Scala)
countByKey() 計算每一個鍵出現的次數。
foreach(func) 遍歷 RDD 中每一個元素,並對其執行fun函數

2.1 reduce

使用函數func執行歸約操做:

val list = List(1, 2, 3, 4, 5)
sc.parallelize(list).reduce((x, y) => x + y)
sc.parallelize(list).reduce(_ + _)

// 輸出 15
複製代碼

2.2 takeOrdered

按天然順序(natural order)或自定義比較器(custom comparator)排序後返回前 n 個元素。須要注意的是 takeOrdered 使用隱式參數進行隱式轉換,如下爲其源碼。因此在使用自定義排序時,須要繼承 Ordering[T] 實現自定義比較器,而後將其做爲隱式參數引入。

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
  .........
}
複製代碼

自定義規則排序:

// 繼承 Ordering[T],實現自定義比較器,按照 value 值的長度進行排序
class CustomOrdering extends Ordering[(Int, String)] {
    override def compare(x: (Int, String), y: (Int, String)): Int
    = if (x._2.length > y._2.length) 1 else -1
}

val list = List((1, "hadoop"), (1, "storm"), (1, "azkaban"), (1, "hive"))
//  引入隱式默認值
implicit val implicitOrdering = new CustomOrdering
sc.parallelize(list).takeOrdered(5)

// 輸出: Array((1,hive), (1,storm), (1,hadoop), (1,azkaban)
複製代碼

2.3 countByKey

計算每一個鍵出現的次數:

val list = List(("hadoop", 10), ("hadoop", 10), ("storm", 3), ("storm", 3), ("azkaban", 1))
sc.parallelize(list).countByKey()

// 輸出: Map(hadoop -> 2, storm -> 2, azkaban -> 1)
複製代碼

2.4 saveAsTextFile

將 dataset 中的元素以文本文件的形式寫入本地文件系統、HDFS 或其它 Hadoop 支持的文件系統中。Spark 將對每一個元素調用 toString 方法,將元素轉換爲文本文件中的一行記錄。

val list = List(("hadoop", 10), ("hadoop", 10), ("storm", 3), ("storm", 3), ("azkaban", 1))
sc.parallelize(list).saveAsTextFile("/usr/file/temp")
複製代碼

參考資料

RDD Programming Guide

更多大數據系列文章能夠參見 GitHub 開源項目大數據入門指南

相關文章
相關標籤/搜索