基本的 RDD 轉化操做html
map()python
語法:RDD.map(<function>,preservesPartitoning=False)apache
轉化操做 map() 是全部轉化操做中最基本的。它將一個具名函數或匿名函數對數據集內的全部元素進行求值。map() 函數能夠異步執行,也不會嘗試與別的 map() 操做通訊或同步。也就是說,這是無共享的操做。api
參數 preserversPatitioning 是可選的,爲 Boolean 類型的參數,用於定義了區分規則的 RDD,它們有定義好的鍵,並按照鍵的哈希值或範圍進行了分組。若是這個參數被設爲 True,這些分區會保存完整。這個參數能夠被 Spark 調度器用於優化後續操做,好比,基於分區的鍵進行的鏈接操做。數據結構
轉化操做 map() 對輸入的每條記錄計算同一個函數,並生成轉化後的輸出記錄。異步
# map()分佈式
map_rdd=sc.textFile('file:///usr/local/spark/README.md')ide
print(map_rdd.take(5))函數
map_rdd_new=map_rdd.map(lambda x:x.split(' '))性能
print(map_rdd_new.take(5))
# 輸出
['# Apache Spark', '', 'Spark is a fast and general cluster computing system for Big Data. It provides', 'high-level APIs in Scala, Java, Python, and R, and an optimized engine that', 'supports general computation graphs for data analysis. It also supports a']
[['#', 'Apache', 'Spark'], [''], ['Spark', 'is', 'a', 'fast', 'and', 'general', 'cluster', 'computing', 'system', 'for', 'Big', 'Data.', 'It', 'provides'], ['high-level', 'APIs', 'in', 'Scala,', 'Java,', 'Python,', 'and', 'R,', 'and', 'an', 'optimized', 'engine', 'that'], ['supports', 'general', 'computation', 'graphs', 'for', 'data', 'analysis.', 'It', 'also', 'supports', 'a']]
在這個例子中,split 函數接收一個字符串,生成一個列表,輸入數據中的每一個字符串元素都被映射爲輸出數據中的一個列表元素。產生的結果爲一個列表的列表。
2.flatMap()
語法::RDD.flatMap(<function>,preservesPartitioning=False)
轉化操做 flatMap() 和轉化操做 map() 相似,都將函數做用於輸入數據集的每條記錄。可是,flatMap() 還會「拍平」輸出數據,這表示它會去掉一層嵌套。好比,給定一個包含字符串列表的列表,「拍平」操做會產生一個由字符串組成的列表,也就是「拍平」了全部嵌套的列表。
# flatMap()
flat_map_rdd=sc.textFile('file:///usr/local/spark/README.md')
print(flat_map_rdd.take(5))
map_rdd_new=flat_map_rdd.flatMap(lambda x:x.split(' '))
print(map_rdd_new.take(5))
# 輸出
['# Apache Spark', '', 'Spark is a fast and general cluster computing system for Big Data. It provides', 'high-level APIs in Scala, Java, Python, and R, and an optimized engine that', 'supports general computation graphs for data analysis. It also supports a']
['#', 'Apache', 'Spark', '', 'Spark']
在這個例子中,flatMap() 使用的匿名函數和 map() 操做所使用的相同。注意,每一個字符串並無產生一個對應的列表對象,全部的元素拍平到一個列表中。換句話說,這個例子裏的 flatMap() 產生了一個組合的列表做爲輸出,而不是 map() 中那個列表的列表。
3.filter()
語法:RDD.filter(<function>)
轉化操做 filter 講一個 Boolean 類型的表達式對數據集裏的每一個元素進行求值,這個表達式一般用匿名函數來表示。返回的布爾值決定了該記錄是否被包含在產生的輸出 RDD 裏。這是一種經常使用的轉化操做,用於從 RDD 中移除不須要的記錄做爲中間結果,或者移除不須要放在最終輸出裏的記錄。
# filter()
licenses = sc.textFile('file:///usr/local/spark/README.md')
words = licenses.flatMap(lambda x:x.spilt(' '))
print(words.take(5))
lowercase = words.map(lambda x:x.lower())
print(lowercase.take(5))
longwords = lowercase.filter(lambda x:len(x) > 12)
print(longwords.take(5))
# 輸出
['#', 'Apache', 'Spark', '', 'Spark']
['#', 'apache', 'spark', '', 'spark']
['<http://spark.apache.org/>', 'documentation', 'documentation,', 'page](http://spark.apache.org/documentation.html).', 'instructions.']
4.distinct()
語法:RDD.distinct(numPartitions=None)
轉化操做 distinct() 返回一個新的 RDD,其中僅包含輸入 RDD 中去重後的元素。它能夠用來去除重複的值。參數 numPartitions 能夠把數據從新分區爲給定的分區數量。若是沒有提供這個參數或是使用了默認值,那麼轉化操做 distinct() 返回的分區數和輸入的 RDD 的 分區數保持一致。
# distinct()
licenses = sc.textFile('file:///usr/local/spark/README.md')
words = licenses.flatMap(lambda x : x.split(' '))
lowercase = words.map(lambda x : x.lower())
allwords = lowercase.count()
diswords = lowercase.distinct().count()
print ("Total words : {} ,Distinct words: {}".format(allwords,diswords))
# 輸出
Total words : 579 ,Distinct words: 276
5.groupBy()
語法:RDD.groupBy(<function>,numPartitons=None)
轉化操做 groupBy() 返回一個按指定函數對元素進行分組的 RDD。參數 <function> 能夠是具名函數,也能夠是匿名函數,用來肯定對全部元素進行分組的鍵,或者指定用於對元素進行求值以肯定其所屬分組的表達式。參數 numPartitions,經過計算分組函數輸出的鍵空間的哈希值,以自動建立指定數量的分區。要注意的是,groupBy() 返回的是一個可迭代對象。
# groupBy()
licenses = sc.textFile('file:///usr/local/spark/README.md')
words = licenses.flatMap(lambda x: x.split(' ')).filter(lambda x:len(x) > 0)
groupbyfirstletter = words.groupBy(lambda x: x[0].lower)
print(groupbyfirstletter.take(1))
# 輸出
[(<built-in method lower of str object at 0x7fbf2ef1b228>, <pyspark.resultiterable.ResultIterable object at 0x7fbf22238ef0>)]
6.sortBy()
語法:RDD.sortBy(<keyfunc>,ascending=True,numPartitions=None)
轉化操做 sortBy() 將 RDD 按照 <keyfunc> 參數選出的指定數據集的鍵進行排序。它根據鍵對象的類型的順序進行排序。參數 ascending 是布爾類型的參數,默認爲 True,指定所使用的排序順序。若是要使用降序,須要設置 ascending=False。
# sortBy()
readme = sc.textFile('file:///usr/local/spark/README.md')
words = readme.flatMap(lambda x:x.split(' ')).filter(lambda x:len(x) > 0)
sortbyfirstletter = words.sortBy(lambda x:x[0].lower(),ascending=False)
print(sortbyfirstletter.take(5))
# 輸出
['You', 'you', 'You', 'you', 'you']
基本的 RDD 行動操做
Spark 中的行動操做要麼返回值,好比 count();要麼返回數據,好比 collect();要麼保存數據到外部,好比 saveAsTextFile()。在全部狀況中,行動操做都會對 RDD 及其全部父 RDD 強制進行計算。一些行動操做返回計數,或是數據的聚合值,或是 RDD 中所有或部分數據。與這些不一樣的是,行動操做 foreach() 會對 RDD 中的每一個元素執行一個函數。
1.count()
語法:RDD.count()
行動操做 count() 不接收參數,返回一個 long 類型的值,表明 RDD 中元素的個數。
# count()
licenses = sc.textFile('file:///usr/local/spark/licenses')
words = licenses.flatMap(lambda x: x.split(' '))
print(words.count())
# 輸出
22997
注意,對於不接收參數的行動操做,須要在行動操做名帶上空的括號 ()。
2.collect()
語法:RDD.collect()
行動操做 collect() 向 Spark 驅動器進程返回一個由 RDD 中全部元素組成的列表。collect() 沒有限制輸出,可能致使輸出量至關大。通常只用在小規模 RDD 或開發中。
# collect()
licenses = sc.textFile('file:///usr/local/spark/licenses')
words = licenses.flatMap(lambda x: x.split(' '))
print(words.collect())
# 輸出
['', '<!DOCTYPE', 'HTML', 'PUBLIC', '"-//W3C//DTD', 'HTML', '4.01',......]
3.take()
語法:RDD.take(n)
行動操做 take() 返回 RDD 的前 n 個元素。選取的元素沒有特定的順序。事實上,行動操做 take() 返回的元素是不肯定的,這意味着再次運行同一個行動操做時,返回的元素可能會不一樣,尤爲是在徹底分佈式的環境中。
對於橫跨超過一個分區的 RDD,take() 會掃描一個分區,並使用該分區的結果來預估還需掃描多少分區才能知足獲取所要求數量的所有的值。
# take()
licenses = sc.textFile('file:///usr/local/spark/licenses')
words = licenses.flatMap(lambda x: x.split(' '))
print(words.take(5))
# 輸出
['', '<!DOCTYPE', 'HTML', 'PUBLIC', '"-//W3C//DTD']
4.top()
語法:RDD.top(n,key=None)
行動操做 top() 返回一個 RDD 中的前 n 個元素,可是和 take() 不一樣的是,若是使用 top(),元素會排序並按照降序輸出。參數 key 指定了按照什麼對結果進行排序以返回前 n 個元素。若是沒有提供,會使用根據 RDD 的元素所推斷出來的鍵。
# top()
licenses = sc.textFile('file:///usr/local/spark/licenses')
words = licenses.flatMap(lambda x: x.split(' '))
print(words.distinct().top(5))
# 輸出
['·', '©', '}', 'your', 'you.</strong>']
5.first()
語法:RDD.first()
行動操做 first() 返回 RDD 的第一個元素。first() 不考慮元素的順序,是一個非肯定性的操做,尤爲是在徹底分佈式的環境中。
# first()
readme = sc.textFile('file:///usr/local/spark/README.md')
words = readme.flatMap(lambda x:x.split(' ')).filter(lambda x:len(x) > 0)
print(words.distinct().first())
print(words.distinct().take(1))
# 輸出
#
['#']
first() 和 take(1) 最主要的區別在於 first() 返回一個原子的數據元素,而 take() (即便 n=1)返回的是由數據元素組成的列表。
6.reduce() 和 fold()
行動操做 reduce() 和 fold() 是執行聚合的行動操做,它們都執行知足交換律或結合律的操做,好比對 RDD 裏的一系列值求和。這裏的交換律和結合律表示操做與執行的順序無關。這是分佈式處理所要求的,由於在分佈式處理中,順序沒法保證。
語法:RDD.reduce(<function>)
RDD.fold(zeroValue,<function>)
行動操做 reduce() 使用指定的知足交換律或結合律的運算符來歸約 RDD 中的全部元素。參數 <function> 指定接收兩個輸入的匿名函數(lambda x,y:....),它表示來自指定 RDD 的序列中的值。
行動操做 fold() 使用給定的 function 和 zeroValue 把 RDD 中每一個分區的元素聚合,而後把每一個分區的聚合結果再聚合。儘管 reduce() 和 fold() 的功能類似,但仍是有區別的,fold() 不知足交換律,所以須要給定第一個值和最後一個值(zeroValue)。
# reduce(),fold()
numbers = sc.parallelize([1,2,3,4,5,6,7,8,9])
print(numbers.reduce(lambda x,y: x+y)) #輸出:45
print(numbers.fold(0,lambda x,y: x+y)) #輸出:45
empty = sc.parallelize([])
# print(empty.reduce(lambda x,y: x+y)) #輸出:ValueError: Can not reduce() empty RDD
print(empty.getNumPartitions()) #查看rdd分區數,輸出爲 2
print(empty.fold(1,lambda x,y: x+y)) #輸出:3
fold中zeroValue除了在每一個分區計算中做爲初始值使用以後,在最後的reduce操做仍然須要使用一次,因此fold()在zeroValue不爲0是計算結果爲reduce()+(分區數+1)*zeroValue(以加法爲例),參考連接:Pyspark學習筆記,fold()官網源碼
7.foreach()
語法:RDD.foreach(<function>)
行動操做 foreach() 把 <> 參數指定的具名或匿名函數應用到 RDD 中的全部元素上。由於 foreach() 是行動操做而不是轉化操做,可使用在轉化操做中沒法使用或不應使用的函數。
# foreach()
def printfunc(x):
print(x)
licenses = sc.textFile('file:///usr/local/spark/licenses')
longwords = licenses.flatMap(lambda x: x.split(' ')).filter(lambda x: len(x) > 12)
longwords.foreach(lambda x: printfunc(x))
# 輸出
...
</dt><dd>means
href="#exhibit-a">Exhibit
id="section-1.10.1">1.10.1.
...
鍵值對 RDD 的轉化操做
鍵值對 RDD,也就是 PairRDD,它的記錄由鍵和值組成。鍵能夠是整型或者字符串對象,也能夠是元組這樣的複雜對象。而值能夠是標量值,也能夠是列表、元組、字典或集合等數據結構。這是在讀時系統和 NoSQL 系統上進行各類結構化數據分析時經常使用的數據表示形式。PairRDD 及其成員函數大體被分爲以下四類:
字典函數
函數式轉化操做
分組操做、聚合操做與排序操做
鏈接操做
字典函數返回鍵值對 RDD 的鍵的集合或值的集合,好比 keys() 和 values()。
1.keys()
語法:RDD.keys()
keys() 函數返回鍵值對 RDD 中全部鍵組成的 RDD,或者說是由鍵值對 RDD 中每一個二元組的第一個元素組成的 RDD。
# keys()
kvpairs = sc.parallelize([('city','Beijing'),('state','SHIGEZHUANG'),('zip','000000'),('country','China')])
print(kvpairs.keys().collect())
# 輸出
['city', 'state', 'zip', 'country']
2.values()
語法:RDD.values()
values() 函數返回鍵值對 RDD 中全部值組成的 RDD,或者說是由鍵值對 RDD 中每一個二元組的第二個元素組成的 RDD。
# values()
kvpairs = sc.parallelize([('city','Beijing'),('state','SHIGEZHUANG'),('zip','000000'),('country','China')])
print(kvpairs.values().collect())
# 輸出
['Beijing', 'SHIGEZHUANG', '000000', 'China']
3.keyBy()
語法:RDD.keyBy(<function>)
轉化操做 keyBy() 建立出由從 RDD 中的元素裏提取的鍵與值組成的元組,其中 <function> 參數給定的函數將原元素轉爲輸出元素的鍵,而原來的整個元組是輸出的值。
# keyBy()
locations = sc.parallelize([('city','Beijing',1),('state','SHIGEZHUANG',2),('zip','000000',3),('country','China',4)])
bylocno = locations.keyBy(lambda x: x[2])
print(bylocno.collect())
# 輸出
[(1, ('city', 'Beijing', 1)), (2, ('state', 'SHIGEZHUANG', 2)), (3, ('zip', '000000', 3)), (4, ('country', 'China', 4))]
4.mapValues()
語法:RDD.mapValues(<function>)
轉化操做 mapValues() 把鍵值對 RDD 的每一個值都傳給一個函數(經過 <function> 參數指定的具名函數或匿名函數),而鍵保持不變。mapValues() 對於每一個輸入元素輸出一個元素。原 RDD 的分區保持不變。
5.flatMapValues()
語法:RDD.flatMapValues(<function>)
轉化操做 flatMapValues() 把鍵值對 RDD 的每一個值都傳給一個函數處理,而鍵保持不變,並生成拍平的列表。對於每一個輸入元素,返回 0 個乃至多個輸出元素。使用 flatMapValues() 是會保留原 RDD 的分區狀況。
# mapValues(),flatMapValues()
locwtemps = sc.parallelize(['Beijing,71|72|73|72|70','Shanghai,46|42|40|37|39','Tianjin,50|48|51|43|44'])
kvpairs = locwtemps.map(lambda x: x.split(','))
print('kvpairs: ',kvpairs.take(4))
locwtemplist = kvpairs.mapValues(lambda x: x.split('|')).mapValues(lambda x: [int(s) for s in x])
print('locwtemplist: ',locwtemplist.take(3))
locwtemps = kvpairs.flatMapValues(lambda x: x.split('|')).map(lambda x:(x[0],int(x[1])))
print('locwtemps: ',locwtemps.take(4))
# 輸出
kvpairs: [['Beijing', '71|72|73|72|70'], ['Shanghai', '46|42|40|37|39'], ['Tianjin', '50|48|51|43|44']]
locwtemplist: [('Beijing', [71, 72, 73, 72, 70]), ('Shanghai', [46, 42, 40, 37, 39]), ('Tianjin', [50, 48, 51, 43, 44])]
locwtemps: [('Beijing', 71), ('Beijing', 72), ('Beijing', 73), ('Beijing', 72)]
6.groupByKey()
語法:RDD.groupByKey(numPartitions=None,partitionFunc=<hash_fn>)
轉化操做 groupByKey() 將鍵值對RDD 按各個鍵對值進行分組,把同組的值整合成一個序列。參數 numPartitions 指定要建立多少個分區(也就是多少個分組)。分區使用 partitionFunc 參數的值建立,默認值爲 Spark 內置的哈希分區函數。若是 numPartitions 爲默認值 None,就使用系統默認的分區數( spark.default.parallelism )。
# groupByKey()
locwtemps = sc.parallelize(['Beijing,71|72|73|72|70','Shanghai,46|42|40|37|39','Tianjin,50|48|51|43|44'])
kvpairs = locwtemps.map(lambda x: x.split(','))
print('kvpairs: ',kvpairs.collect())
locwtemplist = kvpairs.mapValues(lambda x: x.split('|')).mapValues(lambda x: [int(s) for s in x])
print('locwtemplist: ',locwtemplist.collect())
locwtemps = kvpairs.flatMapValues(lambda x: x.split('|')).map(lambda x:(x[0],int(x[1])))
print('locwtemps: ',locwtemps.collect())
grouped = locwtemps.groupByKey()
print('grouped: ',grouped.collect())
avgtemps = grouped.mapValues(lambda x: sum(x)/len(x))
print('avgtemps: ',avgtemps.collect())
# 輸出
kvpairs: [['Beijing', '71|72|73|72|70'], ['Shanghai', '46|42|40|37|39'], ['Tianjin', '50|48|51|43|44']]
locwtemplist: [('Beijing', [71, 72, 73, 72, 70]), ('Shanghai', [46, 42, 40, 37, 39]), ('Tianjin', [50, 48, 51, 43, 44])]
locwtemps: [('Beijing', 71), ('Beijing', 72), ('Beijing', 73), ('Beijing', 72), ('Beijing', 70), ('Shanghai', 46), ('Shanghai', 42), ('Shanghai', 40), ('Shanghai', 37), ('Shanghai', 39), ('Tianjin', 50), ('Tianjin', 48), ('Tianjin', 51), ('Tianjin', 43), ('Tianjin', 44)]
grouped: [('Beijing', <pyspark.resultiterable.ResultIterable object at 0x7f0c3353c2b0>), ('Shanghai', <pyspark.resultiterable.ResultIterable object at 0x7f0c3353c320>), ('Tianjin', <pyspark.resultiterable.ResultIterable object at 0x7f0c3353c2e8>)]
avgtemps: [('Beijing', 71.6), ('Shanghai', 40.8), ('Tianjin', 47.2)]
注意 groupByKey() 返回的分組後的值是一個 resultiterable 對象。Python 中的 iterable 對象是能夠循環遍歷的序列對象。Python 中的許多函數接受可迭代對象做爲輸入,好比 sum() 和 len() 函數。
7.reduceByKey()
語法:RDD.reduceByKey(<function>,numPartitions=None,partitionFunc=<hash_fn>)
轉化操做 reduceByKey() 使用知足結合律的函數合併鍵對應的值。調用鍵值對數據集的 reduceByKey() 方法,返回的是鍵值對的數據集,其數據按照鍵聚合了對應的值。參數 numPartitions 和 partitionFunc 與使用 groupByKey() 函數時的用法如出一轍。numPartitions 的值還影響 saveAsTextFile() 或是其餘產生文件的行動操做所產生的文件數量。例如,numPartitions = 2 會把 RDD 保存在硬盤時共生成兩個輸出文件。
# reduceByKey()
locwtemps = sc.parallelize(['Beijing,71|72|73|72|70','Shanghai,46|42|40|37|39','Tianjin,50|48|51|43|44'])
kvpairs = locwtemps.map(lambda x: x.split(','))
locwtemplist = kvpairs.mapValues(lambda x: x.split('|')).mapValues(lambda x: [int(s) for s in x])
locwtemps = kvpairs.flatMapValues(lambda x: x.split('|')).map(lambda x:(x[0],int(x[1])))
temptups = locwtemps.mapValues(lambda x: (x,1))
print('temptups: ',temptups.collect())
inputstoavg = temptups.reduceByKey(lambda x,y: (x[0]+y[0],x[1]+y[1]))
print('inputstoavg: ',inputstoavg.collect())
averages = inputstoavg.map(lambda x:(x[0],x[1][0]/x[1][1]))
print('averages: ',averages.collect())
# 輸出
temptups: [('Beijing', (71, 1)), ('Beijing', (72, 1)), ('Beijing', (73, 1)), ('Beijing', (72, 1)), ('Beijing', (70, 1)), ('Shanghai', (46, 1)), ('Shanghai', (42, 1)), ('Shanghai', (40, 1)), ('Shanghai', (37, 1)), ('Shanghai', (39, 1)), ('Tianjin', (50, 1)), ('Tianjin', (48, 1)), ('Tianjin', (51, 1)), ('Tianjin', (43, 1)), ('Tianjin', (44, 1))]
inputstoavg: [('Beijing', (358, 5)), ('Shanghai', (204, 5)), ('Tianjin', (236, 5))]
averages: [('Beijing', 71.6), ('Shanghai', 40.8), ('Tianjin', 47.2)]
求平均值不是知足結合律的操做,能夠經過建立元組來繞過去,元組中包含每一個鍵對應的值和與每一個鍵對應的計數,這兩個都知足交換律和結合律,而後在最後一步計算平均值。
注意 reduceByKey() 比較高效,是由於它在每一個執行器本地對值進行了先行組合,而後把組合後的列表發送到遠程的執行器來執行最後的階段。這是一個會產生數據混洗的操做。
以求和函數爲例,能夠看成是累加一個由和組成的列表,而不是對單個值組成的更大的列表進行求和。由於在數據混洗時發送的數據更少,使用 reduceByKey() 進行求和通常要比使用 groupByKey() 並指定 sum() 函數的性能更好。
8.foldByKey()
語法:RDD.foldByKey(zeroValue,<function>,numPartitions=None,partitionFunc=<hash_fn>)
轉化操做 foldByKey() 在功能上和行動操做 fold() 相似,可是 foldByKey() 是轉化操做,操做預先定義的鍵值對元素。foldByKey() 和 fold() 都提供了相同數據類型的 zeroValue 參數供 RDD 爲空時使用。參數 numPartitions 和 partitionFunc 與轉化操做 groupByKey() 和 reduceByKey() 中的做用同樣。
# foldByKey()
locwtemps = sc.parallelize(['Beijing,71|72|73|72|70','Shanghai,46|42|40|37|39','Tianjin,50|48|51|43|44'])
kvpairs = locwtemps.map(lambda x: x.split(','))
locwtemplist = kvpairs.mapValues(lambda x: x.split('|')).mapValues(lambda x: [int(s) for s in x])
locwtemps = kvpairs.flatMapValues(lambda x: x.split('|')).map(lambda x:(x[0],int(x[1])))
maxbycity = locwtemps.foldByKey(0,lambda x,y: x if x > y else y)
print('maxbycity: ',maxbycity.collect())
# 輸出
maxbycity: [('Beijing', 73), ('Shanghai', 46), ('Tianjin', 51)]
9.sortByKey()
語法:RDD.sortByKey(ascending=True,numPartitions=None,keyfunc=<function>)
轉化操做 sortByKey() 把鍵值對 RDD 根據鍵進行排序。排序依據取決於鍵對象的類型。該操做與 sort() 的區別之處在於 sort() 要求指定排序依據的鍵,而 sortByKey() 的鍵是鍵值對 RDD 裏定義的。
鍵按照 ascending 參數提供的順序進行排序,該參數默認值爲 True,表示升序。參數 numPartitions 指定了輸出多少分區,分區函數爲範圍分區函數。參數 keyfunc 是一個可選參數,能夠經過對原鍵使用另外一個函數而修改原鍵。
# sortByKey()
locwtemps = sc.parallelize(['Beijing,71|72|73|72|70','Shanghai,46|42|40|37|39','Tianjin,50|48|51|43|44'])
kvpairs = locwtemps.map(lambda x: x.split(','))
locwtemplist = kvpairs.mapValues(lambda x: x.split('|')).mapValues(lambda x: [int(s) for s in x])
locwtemps = kvpairs.flatMapValues(lambda x: x.split('|')).map(lambda x:(x[0],int(x[1])))
sortedbykey = locwtemps.sortByKey()
print('sortedbykey: ',sortedbykey.collect())
sortedbyval = locwtemps.map(lambda x: (x[1],x[0])).sortByKey(ascending=False)
print('sortedbyval: ',sortedbyval.collect())
# 輸出
sortedbykey: [('Beijing', 71), ('Beijing', 72), ('Beijing', 73), ('Beijing', 72), ('Beijing', 70), ('Shanghai', 46), ('Shanghai', 42), ('Shanghai', 40), ('Shanghai', 37), ('Shanghai', 39), ('Tianjin', 50), ('Tianjin', 48), ('Tianjin', 51), ('Tianjin', 43), ('Tianjin', 44)]
sortedbyval: [(73, 'Beijing'), (72, 'Beijing'), (72, 'Beijing'), (71, 'Beijing'), (70, 'Beijing'), (51, 'Tianjin'), (50, 'Tianjin'), (48, 'Tianjin'), (46, 'Shanghai'), (44, 'Tianjin'), (43, 'Tianjin'), (42, 'Shanghai'), (40, 'Shanghai'), (39, 'Shanghai'), (37, 'Shanghai')]
鏈接操做
1.join()
語法:RDD.join(<otherRDD>,numPartitions=None)
轉化操做 join() 是內鏈接的一個實現,根據鍵來匹配兩個鍵值對 RDD。可選參數 numPartitions 決定生成的數據集要建立多少分區。若是不指明這個參數,缺省值爲 spark.default.parallelism 配置參數對應的值。返回的 RDD 是一個列表,其結構包含匹配鍵,以及一個二元組。這個二元組包含來自兩個 RDD 的一組匹配記錄。
# 鏈接操做
stores = sc.parallelize([(100,'Beijing'),(101,'Shanghai'),(102,'Tianjin'),(103,'Taiyuan')])
salespeople = sc.parallelize([(1,'Tom',100),(2,'Karen',100),(3,'Paul',101),(4,'Jimmy',102),(5,'Jack',None)])
# join()
print(salespeople.keyBy(lambda x: x[2]).join(stores).collect())
# 輸出
[(100, ((1, 'Tom', 100), 'Beijing')), (100, ((2, 'Karen', 100), 'Beijing')), (101, ((3, 'Paul', 101), 'Shanghai')), (102, ((4, 'Jimmy', 102), 'Tianjin'))]
2.leftOuterJoin()
語法:RDD.leftOuterJoin(<otherRDD>,numPartitions=None)
轉化操做 leftOuterJoin() 返回第一個 RDD 中包含的全部記錄或元素。若是第一個 RDD(左 RDD)中的鍵在右 RDD 中存在,那麼右 RDD 中匹配的記錄會和左 RDD 的記錄一塊兒返回。不然,右 RDD 的記錄爲空。
# leftOuterJoin()
leftjoin = salespeople.keyBy(lambda x: x[2]).leftOuterJoin(stores)
print("leftjoin: ",leftjoin.collect())
print(leftjoin.filter(lambda x: x[1][1] is None).map(lambda x: "salesperson " + x[1][0][1] + " has no store").collect())
# 輸出
leftjoin: [(100, ((1, 'Tom', 100), 'Beijing')), (100, ((2, 'Karen', 100), 'Beijing')), (None, ((5, 'Jack', None), None)), (101, ((3, 'Paul', 101), 'Shanghai')), (102, ((4, 'Jimmy', 102), 'Tianjin'))]
['salesperson Jack has no store']
3.rightOuterJoin()
語法:RDD.rightOuterJoin(<otherRDD>,numPartitions=None)
轉化操做 rightOuterJoin() 返回第二個 RDD 中包含的全部元素或記錄。若是第二個 RDD(右 RDD)中的鍵在左 RDD 中存在,則左 RDD 中匹配的記錄會和右 RDD 中的記錄一塊兒返回。不然,左 RDD 的記錄爲 None(空)。
# rightOuterJoin()
print(
salespeople.keyBy(lambda x: x[2])\
.rightOuterJoin(stores)\
.filter(lambda x: x[1][0] is None)\
.map(lambda x: x[1][1] + " store has no salespeople")\
.collect()
)
# 輸出
['Taiyuan store has no salespeople']
4.fullOuterJoin()
語法:RDD.fullOuterJoin(<otherRDD>,numPartitions=None)
fullOuterJoin() 不管是否有匹配的鍵,都會返回兩個 RDD 中的全部元素。左數據集或右數據集中沒有匹配的元素都用 None(空)來表示。
# fullOuterJoin
print(
salespeople.keyBy(lambda x: x[2]).fullOuterJoin(stores).filter(lambda x: x[1][0] is None or x[1][1] is None).collect()
)
# 輸出
[(None, ((5, 'Jack', None), None)), (103, (None, 'Taiyuan'))]
print(
salespeople.keyBy(lambda x: x[2]).fullOuterJoin(stores).collect()
)
# 輸出
[(100, ((1, 'Tom', 100), 'Beijing')), (100, ((2, 'Karen', 100), 'Beijing')), (None, ((5, 'Jack', None), None)), (101, ((3, 'Paul', 101), 'Shanghai')), (102, ((4, 'Jimmy', 102), 'Tianjin')), (103, (None, 'Taiyuan'))]
5.cogroup()
語法:RDD.cogroup(<otherRDD>,numPartitions=None)
轉化操做 cogroup() 將多個鍵值對數據集按鍵進行分組。在概念上和 fullOuterJoin() 有些相似,但在實現上有如下關鍵區別:
轉化操做 cogroup() 返回可迭代對象,相似 groupByKey() 函數。
轉化操做 cogroup() 將兩個 RDD 中的多個元素進行分組,而 fullOuterJoin() 則對同一個鍵建立出多個分開的輸出元素。
轉化操做 cogroup() 能夠經過 Scala API 或者函數別名 groupWith() 對三個以上的 RDD 進行分組。
對 A、B 兩個 RDD 按照鍵 K 進行 cogroup() 操做生成的 RDD 輸出具備下面的結構:
[K, Iterable(K,VA,...), Iterable(K,VB,...) ]
若是一個 RDD 中沒有另外一個 RDD 中包含的給定鍵的值,相應的可迭代對象則爲空。
# cogroup()
print(salespeople.keyBy(lambda x: x[2]).cogroup(stores).take(1))
print('----------------')
print(salespeople.keyBy(lambda x: x[2]).cogroup(stores).mapValues(lambda x: [item for sublist in x for item in sublist]).collect())
# 輸出
[(100, (<pyspark.resultiterable.ResultIterable object at 0x7fc1005319b0>, <pyspark.resultiterable.ResultIterable object at 0x7fc100531ba8>))]
----------------
[(100, [(1, 'Tom', 100), (2, 'Karen', 100), 'Beijing']), (None, [(5, 'Jack', None)]), (101, [(3, 'Paul', 101), 'Shanghai']), (102, [(4, 'Jimmy', 102), 'Tianjin']), (103, ['Taiyuan'])]
6.cartesian()
語法:RDD.cartesian(<otherRDD>)
轉化操做 cartesian() 即笛卡爾集,有時也被稱爲交叉鏈接,會根據兩個 RDD 的記錄生成全部可能的組合。該操做生成的記錄條數等於第一個 RDD 的記錄條數乘以第二個 RDD 的記錄條數。
# cartesian()
print(salespeople.keyBy(lambda x: x[2]).cartesian(stores).collect())
print('----------------')
print(salespeople.keyBy(lambda x: x[2]).cartesian(stores).count())
# 輸出
[((100, (1, 'Tom', 100)), (100, 'Beijing')), ((100, (1, 'Tom', 100)), (101, 'Shanghai')), ((100, (2, 'Karen', 100)), (100, 'Beijing')), ((100, (2, 'Karen', 100)), (101, 'Shanghai')), ((100, (1, 'Tom', 100)), (102, 'Tianjin')), ((100, (1, 'Tom', 100)), (103, 'Taiyuan')), ((100, (2, 'Karen', 100)), (102, 'Tianjin')), ((100, (2, 'Karen', 100)), (103, 'Taiyuan')), ((101, (3, 'Paul', 101)), (100, 'Beijing')), ((101, (3, 'Paul', 101)), (101, 'Shanghai')), ((102, (4, 'Jimmy', 102)), (100, 'Beijing')), ((102, (4, 'Jimmy', 102)), (101, 'Shanghai')), ((None, (5, 'Jack', None)), (100, 'Beijing')), ((None, (5, 'Jack', None)), (101, 'Shanghai')), ((101, (3, 'Paul', 101)), (102, 'Tianjin')), ((101, (3, 'Paul', 101)), (103, 'Taiyuan')), ((102, (4, 'Jimmy', 102)), (102, 'Tianjin')), ((102, (4, 'Jimmy', 102)), (103, 'Taiyuan')), ((None, (5, 'Jack', None)), (102, 'Tianjin')), ((None, (5, 'Jack', None)), (103, 'Taiyuan'))]
----------------
20
集合操做
1.union()
語法:RDD.union(<otherRDD>)
轉化操做 union() 將另外一個 RDD 追加到 RDD 的後面,組合成一個輸出 RDD。兩個 RDD 不必定要有相同的結構。若是兩個輸入 RDD 有相同的記錄,轉化操做 union() 不會從輸出 RDD 中過濾這些重複的數據。
>>> fibonacci = sc.parallelize([0,1,2,3,5,8])
>>> odds = sc.parallelize([1,3,5,7,9])
>>> odds.union(fibonacci).collect()
[1, 3, 5, 7, 9, 0, 1, 2, 3, 5, 8]
2.intersection()
語法:RDD.intersection(<otherRDD>)
轉化操做 intersection() 返回兩個 RDD 中共有的元素。也就是該操做會返回兩個集合中共有的元素。返回的元素或者記錄必須在兩個集合中是如出一轍的,須要記錄的數據結構和每一個字段都對的上。
>>> fibonacci = sc.parallelize([0,1,2,3,5,8])
>>> odds = sc.parallelize([1,3,5,7,9])
>>> odds.intersection(fibonacci).collect()
[1, 3, 5]
3.subtract()
語法:RDD.subtract(<otherRDD>,numPartitions=None)
轉化操做 subtract() 會返回第一個 RDD 中全部沒有出如今第二個 RDD 中的元素。這是數學上的集合減法的一個實現。
>>> fibonacci = sc.parallelize([0,1,2,3,5,8])
>>> odds = sc.parallelize([1,3,5,7,9])
>>> odds.subtract(fibonacci).collect()
[7, 9]
4.subtractByKey()
語法:RDD.subtractByKey(<otherRDD>,numPartitions=None)
轉化操做 subtractByKey() 是一個和 subtract 相似的集合操做。subtractByKey() 操做返回一個鍵值對 RDD 中全部在另外一個鍵值對 RDD 中沒有對應鍵的元素。參數 numPartitions 能夠指定生成的結果 RDD 包含多少個分區,缺省值爲配置項 spark.default.parallelism 的值。
>>> cities1 = sc.parallelize([('Hayward',(37.668819,-122.080795)),
... ('Baumholder',(49.6489,7.3975)),
... ('Alexandria',(38.820450,-77.050552)),
... ('Melbourne',(37.663712,144.844788))])
>>> cities2 = sc.parallelize([('Boulder Creek',(64.0708333,-148.2236111)),
... ('Hayward',(37.668819,-122.080795)),
... ('Alexandria',(38.820450,-77.050552)),
... ('Arlington',(38.878337,-77.100703))])
>>> cities1.subtractByKey(cities2).collect()
[('Melbourne', (37.663712, 144.844788)), ('Baumholder', (49.6489, 7.3975))]
>>> cities2.subtractByKey(cities1).collect()
[('Boulder Creek', (64.0708333, -148.2236111)), ('Arlington', (38.878337, -77.100703))]
數值型 RDD 的操做
數值型 RDD 僅由數值組成,經常使用於統計分析。
>>> numbers = sc.parallelize([0,1,0,1,2,3,4,5,6,7,8,9])
>>> numbers.min() #最小值
0
>>> numbers.max() #最大值
9
>>> numbers.mean() #算術平均數
3.8333333333333335
>>> numbers.sum() #求和
46
>>> numbers.stdev() #標準差
3.0230595245361753
>>> numbers.variance() #方差
9.138888888888888
>>> numbers.stats() #返回 StatCounter 對象,一次調用得到一個包括 count()、mean()、stdev()、max()、min() 的結構
(count: 12, mean: 3.83333333333, stdev: 3.02305952454, max: 9.0, min: 0.0)