Spark RDD 轉化操做與行動操做

本文摘自《Spark 快速大數據分析》

概述

  • RDD 支持兩種操做:轉化操做(Transformation)和行動操做(Action)。
  • 轉化操做時返回一個新的 RDD 的操做,好比 map() 和 filter()。
  • 行動操做則是向驅動器程序返回結果或把結果寫入外部系統的操做,會觸發實際的計算,好比 count() 和first()。
  • Spark 對待轉化操做和行動操做的方式很不同,所以理解你正在進行的操做很重要的。
  • 若是對於一個特定的函數是屬於轉化操做仍是行動操做感到困惑,你能夠看看它的返回值類型:轉化操做返回的是 RDD,而行動操做返回的是其餘的數據類型。

RDD 轉化操做

  • 表 3-2:對一個數據爲 {1, 2, 3, 3} 的 RDD 進行基本的 RDD 轉化操做
函數名 目的 示例 結果
map() 將函數應用於 RDD 中的每一個元素,將返回值構成新的 RDD rdd.map(x -> x+1) {2, 3, 4, 4}
flatMap() 將函數應用於 RDD 中的每一個元素,將返回的迭代器的全部內容構成新的 RDD。一般用來切分單詞 rdd.flatMap(x -> x.to(3)) {1, 2, 3, 2, 3, 3, 3}
filter() 返回一個由經過傳給 filter() 的函數的元素組成的 RDD rdd.filter(x -> x != 1) {2, 3, 3}
distinct() 去重 rdd.distinct() {1, 2, 3}
sample(withReplacement, fraction, [seed]) 對 RDD 採樣,以及是否替換 rdd.sample(false, 0.5) 非肯定的
  • 表 3-3:對數據分別爲 {1, 2, 3} 和 {3, 4, 5} 的 RDD 進行鍼對兩個 RDD 的轉化操做
函數名 目的 示例 結果
union() 生成一個包含兩個 RDD 中全部元素的 RDD rdd.union(other) {1, 2, 3, 3, 4, 5}
intersection() 求兩個 RDD 共同的元素的 RDD rdd.intersection(other) {3}
subtract() 移除另外一個 RDD 中的元素 rdd.subtract(other) {1, 2}
cartesian() 於另外一個 RDD 的笛卡爾積 rdd.cartesian(other) {(1, 3), (1, 4), (1, 5), (2, 3), (2, 4), (2, 5), (3, 3), (3, 4), (3, 5)}

RDD 行動操做

  • 表 3-4:對一個數據爲 {1, 2, 3, 3} 的 RDD 進行基本的 RDD 行動操做
函數名 目的 示例 結果
collect() 返回 RDD 中的全部元素 rdd.collect() {1, 2, 3, 3}
count() RDD 中的元素個數 rdd.count() 4
countByValue() 各元素再 RDD 中出現的次數 rdd.countByValue() {(1, 1), (2, 1), (3, 2)}
take(num) 從 RDD 中返回 num 個元素 rdd.take(2) {1, 2}
top(num) 從 RDD 中返回最前面的 num 個元素 rdd.top(2) {3, 3}
takeOrdered(num)(ordering) 從 RDD 中按照提供的順序返回最前面的 num 個元素 rdd.takeOrdered(2)(myOrdering) {3, 3}
takeSample(withReplacement, num, [seed]) 從 RDD 中返回任意一些元素 rdd.takeSample(false, 1) 非肯定的
reduce(func) 並行整合 RDD 中的數據(例如 sum) rdd.reduce((x, y) -> x + y) 9
fold(zeor)(func) 和 reduce() 同樣,可是須要提供初始值 rdd.fold(0)((x, y) -> x + y) 9
★ aggregate(zeroValue)(seqOp, combOp) 和 reduce() 類似,可是一般返回不一樣類型的函數 rdd.aggergate((0, 0))((x, y) -> (x._1 + y, x._2 + 1), (x, y) -> (x._1 + y._1, x._2 + y._2)) (9, 4)
foreach(func) 對 RDD 中的每一個元素使用給定的函數 rdd.foreach(func)

Pair RDD 轉化操做

  • 表 4-1:Pair RDD 的轉化操做,以鍵值對{(1, 2), (3, 4), (3, 6)} 爲例
函數名 目的 示例 結果
reduceByKey(func) 合併具備相同鍵的值 rdd.reduceByKey((x, y) -> x + y) {(1, 2), (3, 10)}
groupByKey() 對具備相同鍵的值進行分組 rdd.groupByKey() {(1, [2]), (3, [4, 6])}
★ combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner) 使用不一樣返回類型合併具備相同鍵的值 見例4-12 到例 4-14
mapValues(func) 對 pair RDD 中的每一個值應用一個函數而不改變鍵 rdd.mapValues(x -> x + 1) {(1, 3), (3, 5), (3, 7)}
flatMapValues(func) 對 pair RDD 中的每一個值應用一個返回迭代器的函數,而後對返回的每一個元素都生成一個對應原鍵值對記錄。一般用於符號化 rdd.flatMapValues(x -> (x to 5)) {(1, 2), (1, 3), (1, 4), (1, 5), (3, 4), (3, 5)}
keys() 返回一個僅包含鍵的 RDD rdd.keys() {1, 3, 3}
values() 返回一個僅包含值的 RDD rdd.values() {2, 4, 6}
sortByKey() 返回一個根據鍵排序的 RDD rdd.sortByKey() {(1, 2), (3, 4), (3, 6)}
  • 表 4-2:針對兩個 Pair RDD 的轉化操做,rdd = {(1, 2), (3, 4), (3, 6)} other = {(3, 9)}
函數名 目的 示例 結果
subtractByKey 刪掉 RDD 中鍵與 other RDD 中的鍵相同的元素 rdd.substractByKey(other) {(1, 2)}
join 對兩個 RDD 進行內鏈接 rdd.join(other) {(3, (4, 9)), (3, (6, 9))}
★ rightOuterJoin 對兩個 RDD 進行鏈接操做,確保第一個 RDD 的鍵必須存在(右外鏈接) rdd.rightOuterJoin(other) {(3, (Some(4), 9)), (3, (Some(6), 9))}
★ leftOuterJoin 對兩個 RDD 進行鏈接操做,確保第二個 RDD 的鍵必須存在(左外鏈接) rdd.leftOuterJoin(other) {(1, (2, None)), (3, (4, Some(9))), (3, (6, Some(9)))}
cogroup 將兩個RDD 中擁有相同鍵的數據分組到一塊兒 rdd.cogroup(other) {(1, ([2], [])), (3, ([4, 6], [9]))}

Pair RDD 行動操做

  • 表 4-3:Pair RDD 的行動操做,以鍵值對集合 {(1, 2), (3, 4), (3, 6)} 爲例
函數名 目的 示例 結果
countByKey() 對每一個鍵對應的元素分別計數 rdd.countByKey() {(1, 1), (3, 2)}
collectAsMap() 將結果以映射表的形式返回,以便查詢 rdd.collectAsMap() Map{(1, 2), (3, 6)}
lookup(key) 返回給定鍵對應的全部值 rdd.lookup(3) [4, 6]
本文出自 walker snapshot
相關文章
相關標籤/搜索