如下是我的理解,一切以官網文檔爲準。html
http://spark.apache.org/docs/latest/api/python/pyspark.htmljava
在開始以前,我先介紹一下,RDD是什麼?python
RDD是Spark中的抽象數據結構類型,任何數據在Spark中都被表示爲RDD。從編程的角度來看,RDD能夠簡單當作是一個數組。和普通數組的區別是,RDD中的數據是分區存儲的,這樣不一樣分區的數據就能夠分佈在不一樣的機器上,同時能夠被並行處理。所以,Spark應用程序所作的無非是把須要處理的數據轉換爲RDD,而後對RDD進行一系列的變換和操做從而獲得結果。apache
建立RDD:編程
>>> sc.parallelize([1,2,3,4,5], 3) #意思是將數組中的元素轉換爲RDD,而且存儲在3個分區上[1]、[2,3]、[4,5]。若是是4個分區:[1]、[2]、[3]、[4,5]
上面這種是數組建立,也能夠從文件系統或者HDFS中的文件建立出來,後面會講到。api
只要搞懂了spark的函數們,你就成功了一大半。數組
spark的函數主要分兩類,Transformations和Actions。Transformations爲一些數據轉換類函數,actions爲一些行動類函數:數據結構
轉換:轉換的返回值是一個新的RDD集合,而不是單個值。調用一個變換方法,不會有任何求值計算,它只獲取一個RDD做爲參數,而後返回一個新的RDD。app
行動:行動操做計算並返回一個新的值。當在一個RDD對象上調用行動函數時,會在這一時刻計算所有的數據處理查詢並返回結果值。分佈式
下面介紹spark經常使用的Transformations, Actions函數:
Transformations
map(func [, preservesPartitioning=False]) --- 返回一個新的分佈式數據集,這個數據集中的每一個元素都是通過func函數處理過的。
>>> data = [1,2,3,4,5] >>> distData = sc.parallelize(data).map(lambda x: x+1).collect() #結果:[2,3,4,5,6]
filter(func) --- 返回一個新的數據集,這個數據集中的元素是經過func函數篩選後返回爲true的元素(簡單的說就是,對數據集中的每一個元素進行篩選,若是符合條件則返回true,不符合返回false,最後將返回爲true的元素組成新的數據集返回)。
>>> rdd = sc.parallelize(data).filter(lambda x:x%2==0).collect() #結果:[2, 4]
flatMap(func [, preservesPartitioning=False]) --- 相似於map(func), 可是不一樣的是map對每一個元素處理完後返回與原數據集相同元素數量的數據集,而flatMap返回的元素數不必定和原數據集相同。each input item can be mapped to 0 or more output items (so funcshould return a Seq rather than a single item)
#### for flatMap() >>> rdd = sc.parallelize([2,3,4]) >>> sorted(rdd.flatMap(lambda x: range(1,x)).collect()) #結果:[1, 1, 1, 2, 2, 3] >>> sorted(rdd.flatMap(lambda x:[(x,x), (x,x)]).collect()) #結果:[(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)] #### for map() >>> rdd = sc.parallelize([2,3,4]) >>> sorted(rdd.flatMap(lambda x: range(1,x)).collect()) #結果:[[1], [1, 2], [1, 2, 3]] >>> sorted(rdd.flatMap(lambda x:[(x,x), (x,x)]).collect()) #結果:[[(2, 2), (2, 2)], [(3, 3), (3, 3)], [(4, 4), (4, 4)]]
mapPartitions(func [, preservesPartitioning=False]) ---mapPartitions是map的一個變種。map的輸入函數是應用於RDD中每一個元素,而mapPartitions的輸入函數是應用於每一個分區,也就是把每一個分區中的內容做爲總體來處理的。
>>> rdd = sc.parallelize([1,2,3,4,5], 3) >>> def f(iterator): yield sum(iterator) >>> rdd.mapPartitions(f).collect() #結果:[1,5,9]
mapPartitionsWithIndex(func [, preservesPartitioning=False]) ---Similar to mapPartitions, but takes two parameters. The first parameter is the index of the partition and the second is an iterator through all the items within this partition. The output is an iterator containing the list of items after applying whatever transformation the function encodes.
>>> rdd = sc.parallelize([1,2,3,4,5], 3) >>> def f(splitIndex, iterator): yield splitIndex >>> rdd.mapPartitionsWithIndex(f).collect() #結果:[0,1,2] #三個分區的索引
reduceByKey(func [, numPartitions=None, partitionFunc=<function portable_hash at 0x7fa664f3cb90>]) --- reduceByKey就是對元素爲kv對的RDD中Key相同的元素的value進行reduce,所以,key相同的多個元素的值被reduce爲一個值,而後與原RDD中的key組成一個新的kv對。
>>> from operator import add >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> sorted(rdd.reduceByKey(add).collect()) >>> #或者 sorted(rdd.reduceByKey(lambda a,b:a+b).collect()) #結果:[('a', 2), ('b', 1)]
aggregateByKey(zeroValue)(seqOp, combOp [, numPartitions=None]) ---
sortByKey([ascending=True, numPartitions=None, keyfunc=<function <lambda> at 0x7fa665048c80>]) --- 返回排序後的數據集。該函數就是隊kv對的RDD數據進行排序,keyfunc是對key進行處理的函數,如非須要,不用管。
>>> tmp = [('a', 1), ('b', 2), ('1', 3), ('D', 4)] >>> sc.parallelize(tmp).sortByKey(True, 1).collect() #結果: [('1', 3), ('D', 4), ('a', 1), ('b', 2)] >>> sc.parallelize(tmp).sortByKey(True, 2, keyfunc=lambda k:k.lower()).collect() #結果:[('1', 3), ('a', 1), ('b', 2), ('D', 4)] #注意,比較兩個結果可看出,keyfunc對鍵的處理只是在數據處理的過程當中起做用,不能真正的去改變鍵名
join(otherDataset [, numPartitions=None]) --- join就是對元素爲kv對的RDD中key相同的value收集到一塊兒組成(v1,v2),而後與原RDD中的key組合成一個新的kv對,返回。
>>> x = sc.parallelize([("a", 1), ("b", 4)]) >>> y = sc.parallelize([("a", 2), ("a", 3)]) >>> sorted(x.join(y).collect()) #結果:[('a', (1, 2)), ('a', (1, 3))]
cartesian(otherDataset) --- 返回一個笛卡爾積的數據集,這個數據集是經過計算兩個RDDs獲得的。
>>> x = sc.parallelize([1,2,3]) >>> y = sc.parallelize([4,5]) >>> x.cartesian(y).collect() #結果:[(1, 4), (1, 5), (2, 4), (2, 5), (3, 4), (3, 5)]
Action (這裏只講支持python的,java和scala的後面用到了在作詳解,固然支持python就必定支持java和scala)
reduce(func) --- reduce將RDD中元素兩兩傳遞給輸入函數,同時產生一個新的值,新產生的值與RDD中下一個元素再被傳遞給輸入函數直到最後只有一個值爲止。
>>> from operator import add >>> sc.parallelize([1,2,3,4,5]).reduce(add) # 結果:15
collect() --- 返回RDD中的數據,以list形式。
>>> sc.parallelize([1,2,3,4,5]).collect() #結果:[1,2,3,4,5]
count() --- 返回RDD中的元素個數。
>>> sc.parallelize([1,2,3,4,5]).count #結果:5
first() --- 返回RDD中的第一個元素。
>>> sc.parallelize([1,2,3,4,5]).first() #結果:1
take(n) --- 返回RDD中前n個元素。
>>> sc.parallelize([1,2,3,4,5]).take(2) #結果:[1,2]
takeOrdered(n [, key=None]) --- 返回RDD中前n個元素,可是是升序(默認)排列後的前n個元素,或者是經過key函數指定後的RDD(這個key我也沒理解透,後面在作詳解)
>>> sc.parallelize([9,7,3,2,6,4]).takeOrdered(3) #結果:[2,3,4] >>> sc.parallelize([9,7,3,2,6,4]).takeOrdered(3, key=lambda x:-x) #結果:[9,7,6]
saveAsTextFile(path [, compressionCodecClass=None]) --- 該函數將RDD保存到文件系統裏面,而且將其轉換爲文本行的文件中的每一個元素調用 tostring 方法。
parameters: path - 保存於文件系統的路徑
compressionCodecClass - (None by default) string i.e. 「org.apache.hadoop.io.compress.GzipCodec」
>>> tempFile = NamedTemporaryFile(delete=True) >>> tempFile.close() >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name) >>> from fileinput import input >>> from glob import glob >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*")))) '0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n'
Empty lines are tolerated when saving to text files:
>>> tempFile2 = NamedTemporaryFile(delete=True) >>> tempFile2.close() >>> sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name) >>> ''.join(sorted(input(glob(tempFile2.name + "/part-0000*")))) '\n\n\nbar\nfoo\n'
Using compressionCodecClass:
>>> tempFile3 = NamedTemporaryFile(delete=True) >>> tempFile3.close() >>> codec = "org.apache.hadoop.io.compress.GzipCodec" >>> sc.parallelize(['foo', 'bar']).saveAsTextFile(tempFile3.name, codec) >>> from fileinput import input, hook_compressed >>> result = sorted(input(glob(tempFile3.name + "/part*.gz"), openhook=hook_compressed)) >>> b''.join(result).decode('utf-8') u'bar\nfoo\n'
countByKey() --- 返回一個字典(key,count),該函數操做數據集爲kv形式的數據,用於統計RDD中擁有相同key的元素個數。
>>> defdict = sc.parallelize([("a",1), ("b",1), ("a", 1)]).countByKey() >>> defdict #結果:defaultdict(<type 'int'>, {'a': 2, 'b': 1}) >>> defdict.items() #結果:[('a', 2), ('b', 1)]
countByValue() --- 返回一個字典(value,count),該函數操做一個list數據集,用於統計RDD中擁有相同value的元素個數。
>>> sc.parallelize([1,2,3,1,2,5,3,2,3,2]).countByValue().items() #結果:[(1, 2), (2, 4), (3, 3), (5, 1)]
foreach(func) --- 運行函數func來處理RDD中的每一個元素,這個函數常被用來updating an Accumulator或者與外部存儲系統的交互。
>>> def f(x): print(x) >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f) #note: 打印是隨機的,並非必定按1,2,3,4,5的順序打印