RDD編程詳解

RDD即彈性分佈式數據集(Resilient Distributed Dataset),其實就是分佈式的元素集合。就像一個文件中的所有內容、一個數據庫表中的所有記錄,或者是一個列表中的全部元素、一個字典中的全部鍵值對,不一樣於常規對象的是他們中單個對象的內容被分佈式的保存在集羣的不一樣節點中,在計算時也能夠在多個節點上運行。 在spark中,對數據的全部操做都不外乎建立RDD、轉化已有RDD和調用RDD操做進行求值。python

建立RDD操做
用戶可使用兩種方法來建立RDD:讀取一個外部數據集,或者在驅動器程序裏分發驅動器程序中的對象集合。在Spark中進行的操做都是SparkContext的內部方法,經過SparkContext實例化對象實現,常見的建立操做有:數據庫

>>> lines = sc.textFile("README.md") #讀取文本文件做爲一個字符串RDD,文件中每一行的內容爲一個元素。
>>> lines = sc.parallelize(["pandas", "i like pandas"]) #建立RDD的最簡單方式

轉化RDD操做
轉化操做會由一個RDD生成一個新的RDD
當咱們對RDD進行轉化操做時不會當即執行,相反Spark會在內部記錄下所要求執行的操做的相關信息。因此咱們不該該把RDD看作存放着特定數據的數據集,而最好把每一個RDD當作咱們經過轉化操做構建出來的,記錄如何計算數據的指令列表。分佈式

>>> errorsRDD = inputRDD.filter(lambda line: "error" in line) #篩選包含error字符串的行並組成新RDD 
>>> warningsRDD = inputRDD.filter(lambda line: "warning" in line)
>>> badlinesRDD = errorsRDD.union(warningsRDD) #合併兩個RDD中的元素
>>> tenLines = sc.parallelize(tenLines) #將list轉化成RDD對象
>>> tenLinesList = tenLines.collect() #collect用來將RDD中對象轉化成列表,注意普通的列表對象只能存儲在本機,因此使用collect以前要確保本機內存夠用

調用RDD行動操做
行動操做是向驅動器程序返回結果或把結果寫入外部系統的操做,會觸發實際的計算。函數

>>> pythonLines.persist() #將對象持久化到內存中,默認狀況下每次進行spark的行動操時會從新計算變量,當須要屢次調用某一對象時能夠這麼作
>>> pythonLines.first() #提取第一個元素實際操做時只讀取pythonLines的第一個元素,並非加載完pythonLines變量而後取第一個元素
>>> pythonLines.count() #統計元素個數
16
>>> nums = sc.parallelize([1,2,3,3,4])
>>> nums.countByValue()	#統計元素的出現頻率
defaultdict(int, {1: 1, 2: 1, 3: 2, 4: 1})
>>> nums.take(2)	#take方法能提取RDD中的n個元素,其會盡可能訪問少的節點,因此獲得的集合會不均衡,該方法的返回值類型是普通集合
[1,2]
>>> nums.top(2)	#返回RDD中順序最靠前的n個元素
[4,3]
>>> nums.takeSample(True, 3, 2)	#隨機取樣函數,第一個參數指定是否容許重複取樣,第二個參數指定樣本容量,第三個參數爲取樣的隨機數種子
[1,4,4]

雖然你能夠在任什麼時候候定義新的RDD,可是spark只會惰性計算這些RDD。他們只有第一次在一個行動操做中用到時纔會真正進行計算。在大數據領域中,對象的內容會十分龐大,這種作法能夠有效減小加載到內存中的變量大小。 每當咱們調用一個新的行動操做的時候,整個RDD都會從頭開始計算,要避免這種重複低效的行爲須要咱們在合適的場景下將中間結果手動持久化。
Spark的一些轉化和行動操做都須要依賴用戶傳遞的函數來計算。在Python中有三種方式把函數傳遞給Spark:lambda表達式、傳遞頂層函數和調用定義的局部函數。大數據

>>> word = rdd.filter(lambda s: "error" in s)
>>> def containsError(s):
>>> return "error" in s
>>> word = rdd.filter(containsError)

>>> nums = sc.parallelize([1,2,3,4])
>>> squared = nums.map(lambda x: x*x).collect()	#相似於python中map函數的用法
>>> for num in squared:
>>>     print(num)

>>> lines = sc.parallelize(["hello world", "hi girl"])
>>> words = lines.flatMap(lambda line: line.split(" "))	#flatMap()的做用是將全部的輸出結果聚合到一個迭代器能夠訪問的RDD中
>>> for word in words.collect():
>>>     print(word)

>>> rdd1 = ["coffee", "coffee", "tea", "juice", "cola"]
>>> rdd2 = ["tea", "juice", "water"]
>>> rdd1.distinct()	#RDD元素去重操做
["coffee", "tea", "juice", "cola"]
>>> rdd1.union(rdd2)	#兩個RDD對象中元素的並集
["coffee", "coffee", "tea", "juice", "cola", "water"]
>>> rdd1.intersection(rdd2)	#計算兩個RDD中元素的交集
["tea", "juice"]
>>> rdd1.substract(rdd2)	#計算兩個RDD中元素的差集
["coffee", "cola"]

>>> nums = sc.parallelize([1,2,3,4,5])
>>> nums.reduce(lambda x, y: x+y)	#將第一個元素與第二個元素做爲形參傳入函數,將計算出來的結果做爲第一個形參,第三個元素做爲第二個形參繼續進行迭代計算,以此類推計算出最終值,這裏至關於計算1+2+3+4+5
15
>>> nums.aggregate((0, 0), lambda acc, value: (acc[0] + value, acc[1] + 1), lambda
 acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))	#aggregate的用法與reduce相似,不一樣之處在於reduce的輸出值只能和輸入值保持同種類型,可是aggregate能夠自定義結果類型,第一個參數指定默認值,第二個參數指定各個節點上數據的處理函數(同reduce函數參數),第三個參數指定各節點計算結果的合併方法
(15, 5)
>>> nums.fold(8, lambda x, y: [x, y])	#fold方法是簡化版的aggregate,不可更改輸出結果類型,計算原理相似於aggregate,將默認值做爲第一個參數參與迭代計算,計算完畢後與其餘節點上的默認值進行迭代合併
[8, [[[[[8, 1], 2], 3], 4], 5]]

傳遞函數時須要注意的是,Python會在你不經意間把函數所在的類對象也序列化傳遞出去。當你傳遞的對象是某個類對象的成員,或者包含了對某個類對象中一個字段的引用時,Spark就會把整個類對象發到工做節點上,這可能會傳遞更大的數據量。有時候傳遞的類裏面包含Python不知道的對象,也會致使程序報錯。ui

class SearchFunctions(object):
	def __init__(self, query):
		self.query = query
	def isMatch(self, s):
		return self.query in s
	def getMatchesFunctionReference(self, rdd):
		return rdd.filter(self.isMatch) #這裏在「self.isMatch」中引用了整個self

替代的方案是隻把須要的字段從對象中拿出來放到一個局部變量中,而後傳遞這個局部變量。spa

class WorldFunctions(object):
	def getMatchesNoReference(self, rdd):
		query = self.query
		return rdd.filter(lambda x: querty in x)
相關文章
相關標籤/搜索