Spark (Python版) 零基礎學習筆記(二)—— Spark Transformations總結及舉例

1. map(func) 
func函數做用到數據集的每一個元素,生成一個新的分佈式的數據集並返回python

1 >>> a = sc.parallelize(('a', 'b', 'c'))
2 >>> a.map(lambda x: x+'1').collect()
3 ['a1', 'b1', 'c1']

 

2. filter(func) 
選出全部func返回值爲true的元素,做爲一個新的數據集返回正則表達式

1 >>> a = sc.parallelize(range(10))
2 >>> a.filter(lambda x: x%2==0).collect()  # 選出0-9的偶數
3 [0, 2, 4, 6, 8]

 

3. flatMap(func) 
與map類似,可是每一個輸入的item可以被map到0個或者更多的items輸出,也就是說func的返回值應當是一個Sequence,而不是一個單獨的itemshell

1 >>> l = ['I am Tom', 'She is Jenny', 'He is Ben']
2 >>> a = sc.parallelize(l,3)
3 >>> a.flatMap(lambda line: line.split()).collect()  # 將每一個字符串中的單詞劃分出來
4 ['I', 'am', 'Tom', 'She', 'is', 'Jenny', 'He', 'is', 'Ben']

 

4. mapPartitions(func) 
與map類似,可是mapPartitions的輸入函數單獨做用於RDD的每一個分區(block)上,所以func的輸入和返回值都必須是迭代器iterator。 
例如:假設RDD有十個元素0~9,分紅三個區,使用mapPartitions返回每一個元素的平方。若是使用map方法,map中的輸入函數會被調用10次,而使用mapPartitions方法,輸入函數只會被調用3次,每一個分區被調用1次。apache

1 >>> def squareFunc(a):
2 . . .     for i in a:
3 . . .         yield i*i
4 . . .
5 >>> a = sc.parallelize(range(10), 3)
6 PythonRDD[1] at RDD at PythonRDD.scala:48
7 >>> a.mapPartitions(squareFunc).collect()
8 [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

 

5. mapPartitionsWithIndex(func) 
與mapPartitions類似,可是輸入函數func提供了一個正式的參數,能夠用來表示分區的編號。bash

 1 >>> def func(index, iterator):  # 返回每一個分區的編號和數值
 2 . . .     yield (‘index ‘ + str(index) + ’ is: ‘ + str(list(iterator)))
 3 . . .
 4 >>> a = sc.parallelize(range(10),3)
 5 >>> a.mapPartitionsWithIndex(func).collect()
 6 ['index 0 is: [0, 1, 2]', 'index 1 is: [3, 4, 5]', 'index 2 is: [6, 7, 8, 9]']
 7 >>> def squareIndex(index, iterator):  # 返回每一個數值所屬分區的編號和數值的平方
 8 ...     for i in iterator:
 9 ...         yield ("The index is: " + str(index) + ", and the square is: " + str(i*i))
10 ... 
11 >>> a.mapPartitionsWithIndex(squareIndex).collect()
12 ['The index is: 0, and the square is: 0', 
'The index is: 0, and the square is: 1',
'The index is: 1, and the square is: 4',
'The index is: 1, and the square is: 9',
'The index is: 1, and the square is: 16',
'The index is: 2, and the square is: 25',
'The index is: 2, and the square is: 36',
'The index is: 3, and the square is: 49',
'The index is: 3, and the square is: 64',
'The index is: 3, and the square is: 81']

 

6. sample(withReplacementfractionseed) 
從數據中抽樣,withReplacement表示是否有放回,withReplacement=true表示有放回抽樣,fraction爲抽樣的機率(0<=fraction<=1),seed爲隨機種子。 
例如:從1-100之間抽取樣本,被抽取爲樣本的機率爲0.2網絡

1 >>> data = sc.parallelize(range(1,101),2)
2 >>> sample = data.sample(True, 0.2)
3 >>> sampleData.count()
4 19
5 >>> sampleData.collect()
6 [16, 19, 24, 29, 32, 33, 44, 45, 55, 56, 56, 57, 65, 65, 73, 83, 84, 92, 96]

!!!注意,Spark中的sample抽樣,當withReplacement=True時,至關於採用的是泊松抽樣;當withReplacement=False時,至關於採用伯努利抽樣,fraction並非表示抽樣獲得的樣本佔原來數據總量的百分比,而是一個元素被抽取爲樣本的機率。fraction=0.2並非說明要抽出100個數字中20%的數據做爲樣本,而是每一個數字被抽取爲樣本的機率爲0.2,這些數字被認爲來自同一整體,樣本的大小並非固定的,而是服從二項分佈。併發

 

7. union(otherDataset) 
並集操做,將源數據集與union中的輸入數據集取並集,默認保留重複元素(若是不保留重複元素,能夠利用distinct操做去除,下邊介紹distinct時會介紹)。分佈式

1 >>> data1 = sc.parallelize(range(10))
2 >>> data2 = sc.parallelize(range(6,15))
3 >>> data1.union(data2).collect()
4 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 6, 7, 8, 9, 10, 11, 12, 13, 14]

 

8. intersection(otherDataset) 
交集操做,將源數據集與union中的輸入數據集取交集,並返回新的數據集。ide

1 >>> data1 = sc.parallelize(range(10))
2 >>> data2 = sc.parallelize(range(6,15))
3 >>> data1.intersection(data2).collect()
4 [8, 9, 6, 7]

 

9. distinct([numTasks]) 
去除數據集中的重複元素。函數

1 >>> data1 = sc.parallelize(range(10))
2 >>> data2 = sc.parallelize(range(6,15))
3 >>> data1.union(data2).distinct().collect()
4 [0, 8, 1, 9, 2, 10, 11, 3, 12, 4, 5, 13, 14, 6, 7]
 

下邊的一系列transactions會用的鍵(Key)這一律念,在進行下列有關Key操做時使用的數據集爲記錄倫敦各個片區(英文稱爲ward)中學校和學生人數相關信息的表格,下載地址: 
https://data.london.gov.uk/dataset/london-schools-atlas/resource/64f771ee-38b1-4eff-8cd2-e9ba31b90685# 
下載後將其中命名爲WardtoSecSchool_LDS_2015的sheet裏邊的數據保存爲csv格式,刪除第一行的表頭,並從新命名爲school.csv 
數據格式爲: 
(Ward_CODE, Ward_NAME, TotalWardPupils, Ward2Sec_Flow_No., Secondary_School_URN, Secondary_School_Name, Pupil_count) 
首先對數據進行一些預處理:

1 >>> school = sc.textFile("file:///home/yang/下載/school.csv")  
2 Data = sc.textFile("file:///home/yang/下載/school.csv") 
3 >>> school.count()  # 共有16796行數據
4 16796
5 >>> import re  # 引入python的正則表達式包
6 >>> rows = school.map(lambda line: re.subn(',[\s]+',': ', line))

注意:1. 從本地讀取數據時,代碼中要經過 「file://」 前綴指定讀取本地文件。Spark shell 默認是讀取 HDFS 中的文件,須要先上傳文件到 HDFS 中,不然會有「org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:9000/user/hadoop/school.csv」的錯誤。 
2. 對數據集進行了一下預處理,利用正則匹配替換字符串,因爲一些學校的名字的字符串中自己含有逗號,好比「The City Academy, Hackney」, 此時若是利用csv的分隔符’,’進行分割,並不能將名字分割爲「The City Academy」和「Hackney」。咱們注意到csv的分隔符逗號後邊是沒有空格的,而名字裏邊的逗號後邊都會有空格(英語書寫習慣),所以,先利用re.subn語句對逗號後邊含有至少一個空格(正則表達式爲’,[\s]+’)的子字符串進行替換,替換爲’: ’,而後再進行後續操做。以上即爲對這一數據集的預處理過程。

 

10. groupByKey([numTasks]) 
做用於由鍵值對(K, V)組成的數據集上,將Key相同的數據放在一塊兒,返回一個由鍵值對(K, Iterable)組成的數據集。 
注意:1. 若是這一操做是爲了後續在每一個鍵上進行彙集(aggregation),好比sum或者average,此時使用reduceByKey或者aggregateByKey的效率更高。2. 默認狀況下,輸出的並行程度取決於RDD分區的數量,但也能夠經過給可選參數numTasks賦值來調整併發任務的數量。

1 >>> newRows = rows.map(lambda r: r[0].split(','))  
2 >>> ward_schoolname = newRows .map(lambda r: (r[1], r[5])).groupByKey()  # r[1]爲ward的名字,r[5]爲學校的名字
3 >>> ward_schoolname.map(lambda x: {x[0]: list(x[1])}).collect()  # 列出每一個ward區域內全部的學校的名字
4 [{'Stifford Clays': ['William Edwards School', 'Brentwood County High School', "The Coopers' Company and Coborn School", 'Becket Keys Church of England Free School', ...] 
# 輸出結果爲在Stifford Clays這個ward裏的學校有William Edwards School,Brentwood County High School,The Coopers' Company and Coborn School等等...

 

11. reduceByKey(func, [numTasks]) 
做用於鍵值對(K, V)上,按Key分組,而後將Key相同的鍵值對的Value都執行func操做,獲得一個值,注意func的類型必須知足

1 >>> pupils = newRows.map(lambda r: (r[1], int(r[6])))  # r[1]爲ward的名字,r[6]爲每一個學校的學生數
2 >>> ward_pupils = pupils.reduceByKey(lambda x, y: x+y)   # 計算各個ward中的學生數
3 >>> ward_pupils.collect()  # 輸出各個ward中的學生數
4 [('Stifford Clays', 1566), ('Shenley', 1625), ('Southbury', 3526), 
('Rainham and Wennington', 769), ('Bromley Town', 574), ('Waltham Abbey Honey Lane', 835),
('Telegraph Hill', 1238), ('Chigwell Village', 1506), ('Gooshays', 2097), ('Edgware', 2585),
('Camberwell Green', 1374), ('Glyndon', 4633),...]

 

12. aggregateByKey(zeroValueseqOpcomOp, [numTasks]) 
在於鍵值對(K, V)的RDD中,按key將value進行分組合並,合併時,將每一個value和初始值做爲seqOp函數的參數,進行計算,返回的結果做爲一個新的鍵值對(K, V),而後再將結果按照key進行合併,最後將每一個分組的value傳遞給comOp函數進行計算(先將前兩個value進行計算,將返回結果和下一個value傳給comOp函數,以此類推),將key與計算結果做爲一個新的鍵值對(K, V)輸出。 
例子: 上述統計ward內學生人數的操做也能夠經過aggregateByKey實現,此時,seqOpcomOp都是進行加法操做,代碼以下:

1 >>> ward_pupils = pupils.aggregateByKey(0, lambda x, y: x+y, lambda x, y: x+y)
2 >>> ward_pupils.collect()  
3 [('Stifford Clays', 1566), ('Shenley', 1625), ('Southbury', 3526), 
('Rainham and Wennington', 769), ('Bromley Town', 574), ('Waltham Abbey Honey Lane', 835),
('Telegraph Hill', 1238), ('Chigwell Village', 1506), ('Gooshays', 2097), ('Edgware', 2585),
('Camberwell Green', 1374), ('Glyndon', 4633),...]

 

13. sortByKey([ascending=True], [numTasks]) 
按照Key進行排序,ascending的值默認爲True,True/False表示升序仍是降序 
例如:將上述ward按照ward名字降序排列,打印出前十個

1 >>> ward_pupils.sortByKey(False, 4).take(10)
2 [('Yiewsley', 2560), ('Wormholt and White City', 1455), ('Woodside', 1204), 
('Woodhouse', 2930), ('Woodcote', 1214), ('Winchmore Hill', 1116), ('Wilmington', 2243),
('Willesden Green', 1896), ('Whitefoot', 676), ('Whalebone', 2294)]

 

14. join(otherDataset, [numTasks]) 
相似於SQL中的鏈接操做,即做用於鍵值對(K, V)和(K, W)上,返回元組 (K, (V, W)),spark也支持外鏈接,包括leftOuterJoin,rightOuterJoin和fullOuterJoin。例子:

 1 >>> class1 = sc.parallelize(('Tom', 'Jenny', 'Bob')).map(lambda a: (a, 'attended'))
 2 >>> class2 = sc.parallelize(('Tom', 'Amy', 'Alice', 'John')).map(lambda a: (a, 'attended'))
 3 >>> class1.join(class2).collect()
 4 [('Tom', ('attended', 'attended'))]
 5 >>> class1.leftOuterJoin(class2).collect()
 6 [('Tom', ('attended', 'attended')), ('Jenny', ('attended', None)), ('Bob', ('attended', None))]
 7 >>> class1.rightOuterJoin(class2).collect()
 8 [('John', (None, 'attended')), ('Tom', ('attended', 'attended')), ('Amy', (None, 'attended')), ('Alice', (None, 'attended'))]
 9 >>> class1.fullOuterJoin(class2).collect()
10 [('John', (None, 'attended')), ('Tom', ('attended', 'attended')), ('Jenny', ('attended', None)), ('Bob', ('attended', None)), ('Amy', (None, 'attended')), ('Alice', (None, 'attended'))]

 

15. cogroup(otherDataset, [numTasks]) 
做用於鍵值對(K, V)和(K, W)上,返回元組 (K, (Iterable, Iterable))。這一操做可叫作groupWith。

1 >>> class1 = sc.parallelize(('Tom', 'Jenny', 'Bob')).map(lambda a: (a, 'attended'))
2 >>> class2 = sc.parallelize(('Tom', 'Amy', 'Alice', 'John')).map(lambda a: (a, 'attended'))
3 >>> group = class1.cogroup(class2)
4 >>> group.collect()
5 [('John', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e808afd0>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e808a1d0>)), 
('Tom', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e808a7f0>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e808a048>)),
('Jenny', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e808a9b0>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e808a208>)),
('Bob', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e808ae80>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e8b448d0>)),
('Amy', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e8b44c88>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e8b44588>)),
('Alice', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e8b44748>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e8b44f98>))] 6 >>> group.map(lambda x: {x[0]: [list(x[1][0]), list(x[1][1])]}).collect() 7 [{'John': [[], ['attended']]}, {'Tom': [['attended'], ['attended']]}, {'Jenny': [['attended'], []]}, {'Bob': [['attended'], []]}, {'Amy': [[], ['attended']]}, {'Alice': [[], ['attended']]}]

 

16. cartesian(otherDataset) 
笛卡爾乘積,做用於數據集T和U上,返回(T, U),即數據集中每一個元素的兩兩組合

1 >>> a = sc.parallelize(('a', 'b', 'c'))
2 >>> b = sc.parallelize(('d', 'e', 'f'))
3 >>> a.cartesian(b).collect()
4 [('a', 'd'), ('a', 'e'), ('a', 'f'), ('b', 'd'), ('b', 'e'), ('b', 'f'), ('c', 'd'), ('c', 'e'), ('c', 'f')]

 

17. pipe(command, [envVars]) 
將驅動程序中的RDD交給shell處理(外部進程),例如Perl或bash腳本。RDD元素做爲標準輸入傳給腳本,腳本處理以後的標準輸出會做爲新的RDD返回給驅動程序。

 

18. coalesce(numPartitions) 
將RDD的分區數減少到numPartitions個。當數據集經過過濾規模減少時,使用這個操做能夠提高性能。

 

19. repartition(numPartitions) 
重組數據,數據被從新隨機分區爲numPartitions個,numPartitions能夠比原來大,也能夠比原來小,平衡各個分區。這一操做會將整個數據集在網絡中從新洗牌。

 

20. repartitionAndSortWithinPartitions(partitioner) 
根據給定的partitioner函數從新將RDD分區,並在分區內排序。這比先repartition而後在分區內sort高效,緣由是這樣迫使排序操做被移到了shuffle階段。

相關文章
相關標籤/搜索