Spark (Python版) 零基礎學習筆記(二)—— Spark Transformations總結及舉例

1. map(func) 

1 >>> a = sc.parallelize(('a', 'b', 'c'))
2 >>> x: x+'1').collect()
3 ['a1', 'b1', 'c1']


2. filter(func) 

1 >>> a = sc.parallelize(range(10))
2 >>> a.filter(lambda x: x%2==0).collect()  # 選出0-9的偶數
3 [0, 2, 4, 6, 8]


3. flatMap(func) 

1 >>> l = ['I am Tom', 'She is Jenny', 'He is Ben']
2 >>> a = sc.parallelize(l,3)
3 >>> a.flatMap(lambda line: line.split()).collect()  # 將每一個字符串中的單詞劃分出來
4 ['I', 'am', 'Tom', 'She', 'is', 'Jenny', 'He', 'is', 'Ben']


4. mapPartitions(func) 

1 >>> def squareFunc(a):
2 . . .     for i in a:
3 . . .         yield i*i
4 . . .
5 >>> a = sc.parallelize(range(10), 3)
6 PythonRDD[1] at RDD at PythonRDD.scala:48
7 >>> a.mapPartitions(squareFunc).collect()
8 [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]


5. mapPartitionsWithIndex(func) 

 1 >>> def func(index, iterator):  # 返回每一個分區的編號和數值
 2 . . .     yield (‘index ‘ + str(index) + ’ is: ‘ + str(list(iterator)))
 3 . . .
 4 >>> a = sc.parallelize(range(10),3)
 5 >>> a.mapPartitionsWithIndex(func).collect()
 6 ['index 0 is: [0, 1, 2]', 'index 1 is: [3, 4, 5]', 'index 2 is: [6, 7, 8, 9]']
 7 >>> def squareIndex(index, iterator):  # 返回每一個數值所屬分區的編號和數值的平方
 8 ...     for i in iterator:
 9 ...         yield ("The index is: " + str(index) + ", and the square is: " + str(i*i))
10 ... 
11 >>> a.mapPartitionsWithIndex(squareIndex).collect()
12 ['The index is: 0, and the square is: 0', 
'The index is: 0, and the square is: 1',
'The index is: 1, and the square is: 4',
'The index is: 1, and the square is: 9',
'The index is: 1, and the square is: 16',
'The index is: 2, and the square is: 25',
'The index is: 2, and the square is: 36',
'The index is: 3, and the square is: 49',
'The index is: 3, and the square is: 64',
'The index is: 3, and the square is: 81']


6. sample(withReplacementfractionseed) 

1 >>> data = sc.parallelize(range(1,101),2)
2 >>> sample = data.sample(True, 0.2)
3 >>> sampleData.count()
4 19
5 >>> sampleData.collect()
6 [16, 19, 24, 29, 32, 33, 44, 45, 55, 56, 56, 57, 65, 65, 73, 83, 84, 92, 96]



7. union(otherDataset) 

1 >>> data1 = sc.parallelize(range(10))
2 >>> data2 = sc.parallelize(range(6,15))
3 >>> data1.union(data2).collect()
4 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 6, 7, 8, 9, 10, 11, 12, 13, 14]


8. intersection(otherDataset) 

1 >>> data1 = sc.parallelize(range(10))
2 >>> data2 = sc.parallelize(range(6,15))
3 >>> data1.intersection(data2).collect()
4 [8, 9, 6, 7]


9. distinct([numTasks]) 

1 >>> data1 = sc.parallelize(range(10))
2 >>> data2 = sc.parallelize(range(6,15))
3 >>> data1.union(data2).distinct().collect()
4 [0, 8, 1, 9, 2, 10, 11, 3, 12, 4, 5, 13, 14, 6, 7]

(Ward_CODE, Ward_NAME, TotalWardPupils, Ward2Sec_Flow_No., Secondary_School_URN, Secondary_School_Name, Pupil_count) 

1 >>> school = sc.textFile("file:///home/yang/下載/school.csv")  
2 Data = sc.textFile("file:///home/yang/下載/school.csv") 
3 >>> school.count()  # 共有16796行數據
4 16796
5 >>> import re  # 引入python的正則表達式包
6 >>> rows = line: re.subn(',[\s]+',': ', line))

注意:1. 從本地讀取數據時,代碼中要經過 「file://」 前綴指定讀取本地文件。Spark shell 默認是讀取 HDFS 中的文件,須要先上傳文件到 HDFS 中,不然會有「org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:9000/user/hadoop/school.csv」的錯誤。 
2. 對數據集進行了一下預處理,利用正則匹配替換字符串,因爲一些學校的名字的字符串中自己含有逗號,好比「The City Academy, Hackney」, 此時若是利用csv的分隔符’,’進行分割,並不能將名字分割爲「The City Academy」和「Hackney」。咱們注意到csv的分隔符逗號後邊是沒有空格的,而名字裏邊的逗號後邊都會有空格(英語書寫習慣),所以,先利用re.subn語句對逗號後邊含有至少一個空格(正則表達式爲’,[\s]+’)的子字符串進行替換,替換爲’: ’,而後再進行後續操做。以上即爲對這一數據集的預處理過程。


10. groupByKey([numTasks]) 
做用於由鍵值對(K, V)組成的數據集上,將Key相同的數據放在一塊兒,返回一個由鍵值對(K, Iterable)組成的數據集。 
注意:1. 若是這一操做是爲了後續在每一個鍵上進行彙集(aggregation),好比sum或者average,此時使用reduceByKey或者aggregateByKey的效率更高。2. 默認狀況下,輸出的並行程度取決於RDD分區的數量,但也能夠經過給可選參數numTasks賦值來調整併發任務的數量。

1 >>> newRows = r: r[0].split(','))  
2 >>> ward_schoolname = newRows .map(lambda r: (r[1], r[5])).groupByKey()  # r[1]爲ward的名字,r[5]爲學校的名字
3 >>> x: {x[0]: list(x[1])}).collect()  # 列出每一個ward區域內全部的學校的名字
4 [{'Stifford Clays': ['William Edwards School', 'Brentwood County High School', "The Coopers' Company and Coborn School", 'Becket Keys Church of England Free School', ...] 
# 輸出結果爲在Stifford Clays這個ward裏的學校有William Edwards School,Brentwood County High School,The Coopers' Company and Coborn School等等...


11. reduceByKey(func, [numTasks]) 
做用於鍵值對(K, V)上,按Key分組,而後將Key相同的鍵值對的Value都執行func操做,獲得一個值,注意func的類型必須知足

1 >>> pupils = r: (r[1], int(r[6])))  # r[1]爲ward的名字,r[6]爲每一個學校的學生數
2 >>> ward_pupils = pupils.reduceByKey(lambda x, y: x+y)   # 計算各個ward中的學生數
3 >>> ward_pupils.collect()  # 輸出各個ward中的學生數
4 [('Stifford Clays', 1566), ('Shenley', 1625), ('Southbury', 3526), 
('Rainham and Wennington', 769), ('Bromley Town', 574), ('Waltham Abbey Honey Lane', 835),
('Telegraph Hill', 1238), ('Chigwell Village', 1506), ('Gooshays', 2097), ('Edgware', 2585),
('Camberwell Green', 1374), ('Glyndon', 4633),...]


12. aggregateByKey(zeroValueseqOpcomOp, [numTasks]) 
在於鍵值對(K, V)的RDD中,按key將value進行分組合並,合併時,將每一個value和初始值做爲seqOp函數的參數,進行計算,返回的結果做爲一個新的鍵值對(K, V),而後再將結果按照key進行合併,最後將每一個分組的value傳遞給comOp函數進行計算(先將前兩個value進行計算,將返回結果和下一個value傳給comOp函數,以此類推),將key與計算結果做爲一個新的鍵值對(K, V)輸出。 
例子: 上述統計ward內學生人數的操做也能夠經過aggregateByKey實現,此時,seqOpcomOp都是進行加法操做,代碼以下:

1 >>> ward_pupils = pupils.aggregateByKey(0, lambda x, y: x+y, lambda x, y: x+y)
2 >>> ward_pupils.collect()  
3 [('Stifford Clays', 1566), ('Shenley', 1625), ('Southbury', 3526), 
('Rainham and Wennington', 769), ('Bromley Town', 574), ('Waltham Abbey Honey Lane', 835),
('Telegraph Hill', 1238), ('Chigwell Village', 1506), ('Gooshays', 2097), ('Edgware', 2585),
('Camberwell Green', 1374), ('Glyndon', 4633),...]


13. sortByKey([ascending=True], [numTasks]) 

1 >>> ward_pupils.sortByKey(False, 4).take(10)
2 [('Yiewsley', 2560), ('Wormholt and White City', 1455), ('Woodside', 1204), 
('Woodhouse', 2930), ('Woodcote', 1214), ('Winchmore Hill', 1116), ('Wilmington', 2243),
('Willesden Green', 1896), ('Whitefoot', 676), ('Whalebone', 2294)]


14. join(otherDataset, [numTasks]) 
相似於SQL中的鏈接操做,即做用於鍵值對(K, V)和(K, W)上,返回元組 (K, (V, W)),spark也支持外鏈接,包括leftOuterJoin,rightOuterJoin和fullOuterJoin。例子:

 1 >>> class1 = sc.parallelize(('Tom', 'Jenny', 'Bob')).map(lambda a: (a, 'attended'))
 2 >>> class2 = sc.parallelize(('Tom', 'Amy', 'Alice', 'John')).map(lambda a: (a, 'attended'))
 3 >>> class1.join(class2).collect()
 4 [('Tom', ('attended', 'attended'))]
 5 >>> class1.leftOuterJoin(class2).collect()
 6 [('Tom', ('attended', 'attended')), ('Jenny', ('attended', None)), ('Bob', ('attended', None))]
 7 >>> class1.rightOuterJoin(class2).collect()
 8 [('John', (None, 'attended')), ('Tom', ('attended', 'attended')), ('Amy', (None, 'attended')), ('Alice', (None, 'attended'))]
 9 >>> class1.fullOuterJoin(class2).collect()
10 [('John', (None, 'attended')), ('Tom', ('attended', 'attended')), ('Jenny', ('attended', None)), ('Bob', ('attended', None)), ('Amy', (None, 'attended')), ('Alice', (None, 'attended'))]


15. cogroup(otherDataset, [numTasks]) 
做用於鍵值對(K, V)和(K, W)上,返回元組 (K, (Iterable, Iterable))。這一操做可叫作groupWith。

1 >>> class1 = sc.parallelize(('Tom', 'Jenny', 'Bob')).map(lambda a: (a, 'attended'))
2 >>> class2 = sc.parallelize(('Tom', 'Amy', 'Alice', 'John')).map(lambda a: (a, 'attended'))
3 >>> group = class1.cogroup(class2)
4 >>> group.collect()
5 [('John', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e808afd0>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e808a1d0>)), 
('Tom', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e808a7f0>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e808a048>)),
('Jenny', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e808a9b0>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e808a208>)),
('Bob', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e808ae80>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e8b448d0>)),
('Amy', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e8b44c88>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e8b44588>)),
('Alice', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e8b44748>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e8b44f98>))] 6 >>> x: {x[0]: [list(x[1][0]), list(x[1][1])]}).collect() 7 [{'John': [[], ['attended']]}, {'Tom': [['attended'], ['attended']]}, {'Jenny': [['attended'], []]}, {'Bob': [['attended'], []]}, {'Amy': [[], ['attended']]}, {'Alice': [[], ['attended']]}]


16. cartesian(otherDataset) 
笛卡爾乘積,做用於數據集T和U上,返回(T, U),即數據集中每一個元素的兩兩組合

1 >>> a = sc.parallelize(('a', 'b', 'c'))
2 >>> b = sc.parallelize(('d', 'e', 'f'))
3 >>> a.cartesian(b).collect()
4 [('a', 'd'), ('a', 'e'), ('a', 'f'), ('b', 'd'), ('b', 'e'), ('b', 'f'), ('c', 'd'), ('c', 'e'), ('c', 'f')]


17. pipe(command, [envVars]) 


18. coalesce(numPartitions) 


19. repartition(numPartitions) 


20. repartitionAndSortWithinPartitions(partitioner) 
