spark RDD算子詳解3

Actions算子數組

本質上在Actions算子中經過SparkContext執行提交做業的runJob操做,觸發了RDD DAG的執行。分佈式

1.無輸出函數

(1)foreach(f)優化

對RDD中的每一個元素都應用f函數操做,不返回RDD和Array,而是返回Uint。spa

圖3-25表示foreach算子經過用戶自定義函數對每一個數據項進行操做。本例中自定義函數爲println(),控制檯打印全部數據項。scala

2.HDFScode

saveAsTextFile(path, compressionCodecClass=None)排序

函數將數據輸出,存儲到HDFS的指定目錄。
將RDD中的每一個元素映射轉變爲(Null, x.toString),而後再將其寫入HDFS。內存

圖3-26中左側的方框表明RDD分區,右側方框表明HDFS的Block。經過函數將RDD的每一個分區存儲爲HDFS中的一個Block。io

3.Scala集合和數據類型

(1)collect()

collect將分佈式的RDD返回爲一個單機的scala Array數組。在這個數組上運用scala的函數式操做。

圖3-28中的左側方框表明RDD分區,右側方框表明單機內存中的數組。經過函數操做,將結果返回到Driver程序所在的節點,以數組形式存儲。

(2)collectAsMap()

collectAsMap對(K, V)型的RDD數據返回一個單機HashMap。對於重複K的RDD元素,後面的元素覆蓋前面的元素。

圖3-29中的左側方框表明RDD分區,右側方框表明單機數組。數據經過collectAsMap函數返回給Driver程序計算結果,結果以HashMap形式存儲。

(3)reduceByKeyLocally(func)

實現的是先reduce再collectAsMap的功能,先對RDD的總體進行reduce操做,而後再收集全部結果返回爲一個HashMap。

(4)lookup(key)

Lookup函數對(Key, Value)型的RDD操做,返回指定Key對應的元素造成的Seq。這個函數處理優化的部分在於,若是這個RDD包含分區器,則只會對應處理K所在的分區,而後返回由(K, V)造成的Seq。若是RDD不包含分區器,則須要對全RDD元素進行暴力掃描處理,搜索指定K對應的元素。

圖3-30中的左側方框表明RDD分區,右側方框表明Seq,最後結果返回到Driver所在節點的應用中。

(5)count()

count返回整個RDD的元素個數。內部函數實現以下。
在圖3-31中,返回數據的個數爲5。一個方塊表明一個RDD分區。

(6)top(num, key=None)

top可返回最大的k個元素。
相近函數說明以下。

top返回最大的k個元素。

take返回最小的k個元素。

takeOrdered返回最小的k個元素,而且在返回的數組中保持元素的順序。

first至關於top(1)返回整個RDD中的前k個元素,能夠定義排序的方式Ordering[T]。返回的是一個含前k個元素的數組。

(7)reduce(f)

經過函數func(接受兩個參數,返回一個參數)彙集數據集中的全部元素。這個功能必須可交換且可關聯的,從而能夠正確的被並行執行。

例子:

>>> from operator import add
>>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
15
>>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add)
10

(8)fold(zeroValue, op)

fold和reduce的原理相同,可是與reduce不一樣,至關於每一個reduce時,迭代器取的第一個元素是zeroValue。

>>> from operator import add
>>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
15
相關文章
相關標籤/搜索