[Spark] Pair RDD常見轉化操做

本篇博客中的操做都在 ./bin/pyspark 中執行。python

對單個 Pair RDD 的轉化操做

下面會對 Pair RDD 的一些轉化操做進行解釋。先假設咱們有下面這些RDD(在pyspark中操做):spa

nums = sc.parallelize( [ (1,2)       ,(3,4)       ,(3,6)      ] )
x    = sc.parallelize( [ (1,[2,4,5]) ,(4,[7,8,0]) ,(4,[6,7,5])] )

reduceByKey

概述:合併具備相同鍵值code

例子:blog

>>> nums.reduceByKey(lambda x, y : x + y).collect()
[(1, 2), (3, 10)]
>>> 
>>> x.reduceByKey(lambda x, y: x + y).collect()
[(1, [2, 4, 5]), (4, [7, 8, 0, 6, 7, 5])]

這個方法操做的是值(Values),對上面的兩個RDD的操做,第一個是對值作加法,第二個是對列表合併;這兩個操做均可以使用lambda x, y : x + y來完成。排序

再來一個例子,求平均值,(下面的這個RDD的鍵值中,第一個值是總和,第二個值是數量):博客

>>> test = sc.parallelize([('panda', (1,2)), ('pink',(7,2)), ('pirate',(3,1))])
>>> test.mapValues(lambda (x,y): x / (y* 1.0)).collect()
[('panda', 0.5), ('pink', 3.5), ('pirate', 3.0)]

groupByKey

groupByKey 方法的目的是對具備相同鍵值的數據進行分組,好比說:it

>>> l = nums.groupByKey().collect()[1][1]
>>> l
<pyspark.resultiterable.ResultIterable object at 0x109320f10>
>>> for i in l:
...     print i
... 
4
6

直觀地來講,對nums這個RDD的groupByKey操做能夠表示爲:spark

[(1,2),(3,4),(3,6)]  ->  [ (1,[2]), (3, [4,6] )]

而後是對於x這個RDD的:class

>>> x    = sc.parallelize( [ (1,[2,4,5]) ,(4,[7,8,0]) ,(4,[6,7,5])] )

>>> l = x.groupByKey().collect()
>>> l
[(1, <pyspark.resultiterable.ResultIterable object at 0x109310690>), (4, <pyspark.resultiterable.ResultIterable object at 0x109310050>)]

>>> l2 = l[1][1]
>>> l2
<pyspark.resultiterable.ResultIterable object at 0x109310050>

>>> for i in l2:
...     print i
... 
[7, 8, 0]
[6, 7, 5]

直觀的來講:test

[  (1,[2,4,5]), (4,[7,8,0]) ,(4,[6,7,5] ) ]
         +
         |   +-------------+
         |   |   RDD.join  |
         |   +-------------+
         v
[ (1,[2,4,5]), (4, [ [6,7,5], [7,8,0] ] ) ]

mapValues

這個比較好理解,對每一個鍵值進行操做:

>>> nums.mapValues(lambda x : x+ 3).collect()
[(1, 5), (3, 7), (3, 9)]

flatMapValues

這個方法的做用是對pair RDD 的每一個值(values)生成一個與原鍵(key)對應的鍵值對記錄。

x  = sc.parallelize( [ (1,[2,4,5]) ,(4,[7,8,0]) ,(4,[6,7,5])] )

>>> def f(x):
...     return x
...

>>> x.flatMapValues(f).collect()
[(1, 2), (1, 4), (1, 5), (4, 7), (4, 8), (4, 0), (4, 6), (4, 7), (4, 5)]

這個能夠用"flat"這個英文單詞的意思來大體理解一下,flat有使變平,拍扁的意思。

對於 nums 這種RDD是進行不了這個方法的。

keys()

返回全部的鍵值:

>>> nums.keys().collect()
[1, 3, 3]
>>> x.keys().collect()
[1, 4, 4]

values

返回全部的值:

>>> nums.values().collect()
[2, 4, 6]
>>> x.values().collect()
[[2, 4, 5], [7, 8, 0], [6, 7, 5]]

sortByKey

按照鍵值排序,這個比較好理解:

>>> notSorted = sc.parallelize( [ (7,[2,4,5]) ,(9,[7,8,0]) ,(4,[6,7,5])] )

>>> notSorted.sortByKey().collect()
[(4, [6, 7, 5]), (7, [2, 4, 5]), (9, [7, 8, 0])]

對2個Pair RDD的轉化操做

這裏咱們有:

other = sc.parallelize([(3,9)])
nums = sc.parallelize([(1,2),(3,4),(3,6)])

subtractByKey

返回一個減去兩個RDD中同樣的Key的RDD,能夠理解爲除去下圖中重合的部分

subtractByKey

一個例子:

>>> nums.subtractByKey(other).collect()
[(1, 2)]

join

這個操的做描爲:返回一個RDD,返回的RDD只包含輸入的兩個RDD都包含的鍵值,每一個鍵值對形如(k, (v1, v2)),其中v1被本身包含,v2被另外一個RDD包含。一個例子:

>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2), ("a", 3)])
>>> x.join(y).collect()
[('a', (1, 2)), ('a', (1, 3))]

能夠理解爲:

RDD.join

同理換作咱們開始提到的兩個RDD:

>>> nums.join(other).collect()
[(3, (4, 9)), (3, (6, 9))]

rightOuterJoin && leftOuterJoin

這兩個方法的Join的原理和上面的join同樣,關於left 和 right 的說明是:

  • right:確保右邊的RDD的鍵必須存在
  • left:確保左邊的RDD的鍵必須存在

一個例子:

>>> nums.rightOuterJoin(other).collect()
[(3, (4, 9)), (3, (6, 9))]

>>> nums.leftOuterJoin(other).collect()
[(1, (2, None)), (3, (4, 9)), (3, (6, 9))]
相關文章
相關標籤/搜索