【原】Learning Spark (Python版) 學習筆記(一)----RDD 基本概念與命令

       《Learning Spark》這本書算是Spark入門的必讀書了,中文版是《Spark快速大數據分析》,不過豆瓣書評頗有意思的是,英文原版評分7.4,評論都說入門而已深刻不足,中文譯版評分8.4,評論一片好評,有點意思。我倒以爲這本書能夠做爲官方文檔的一個補充,刷完後基本上對Spark的一些基本概念、碼簡單的程序是沒有問題的了。這本書有一個好處是它是用三門語言寫的,Python/Java/Scala,因此適用性很廣,個人觀點是,先精通一門語言,再去學其餘語言。因爲我工做中比較經常使用的是Python,因此就用把Python相關的命令總結一下。下一階段再深刻學習Java和Scala。這一篇總結第一章-第三章的重點內容。shell

 
  說到Spark,就不得不提到RDD,RDD,字面意思是彈性分佈式數據集,其實就是分佈式的元素集合。Python的基本內置的數據類型有整型、字符串、元祖、列表、字典,布爾類型等,而Spark的數據類型只有RDD這一種,在Spark裏,對數據的全部操做,基本上就是圍繞RDD來的,譬如建立、轉換、求值等等。全部RDD的轉換都是lazy(惰性求值)的,RDD的轉換操做會生成新的RDD,新的RDD的數據依賴於原來的RDD的數據,每一個RDD又包含多個分區。那麼一段程序實際上就構造了一個由相互依賴的多個RDD組成的有向無環圖(DAG)。並經過在RDD上執行動做將這個有向無環圖做爲一個Job提交給Spark執行。理解RDD後能夠避免之後走不少彎路。關於RDD的特色,能夠搜到不少資料,其實咱們只須要理解兩點就能夠了:
  1.不可變
      2.分佈式
 
     有人會以爲很奇怪,若是RDD不可變,那麼在進行數據操做的時候,怎麼改變它的值,怎麼進行計算呢?其實RDD支持兩種操做
     1.Tansformation(轉化操做):返回值仍是一個RDD
     2.Action(行動操做):返回值不是一個RDD
 
     第一種Transformation是返回一個新的RDD,如map(),filter()等。這種操做是lazy(惰性)的,即從一個RDD轉換生成另外一個RDD的操做不是立刻執行,只是記錄下來,只有等到有Action操做是纔會真正啓動計算,將生成的新RDD寫到內存或hdfs裏,不會對原有的RDD的值進行改變。而Action操做纔會實際觸發Spark計算,對RDD計算出一個結果,並把結果返回到內存或hdfs中,如count(),first()等。
 
     通俗點理解的話,就是假設你寫了一堆程序,裏面對數據進行了屢次轉換,這個時候實際上沒有計算,就只是放着這裏。在最後出結果的時候會用到Action操做,這個時候Action會執行與之相關的轉換操做,運算速度會很是快(一是Action不必定須要調用全部的transformation操做,二是隻有在最後一步纔會計算相關的transformation操做)。若是Transformation沒有lazy性質的話,每轉換一次就要計算一次,最後Action操做的時候還要計算一次,會很是耗內存,也會極大下降計算速度。
 
     還有一種狀況,若是咱們想屢次使用同一個RDD,每次都對RDD進行Action操做的話,會極大的消耗Spark的內存,這種狀況下,咱們可使用RDD.persist()把這個RDD緩存下來,在內存不足時,能夠存儲到磁盤(disk)裏。在Python中,儲存的對象永遠是經過Pickle庫序列化過的,因此社不設置序列化級別不會產生影響。
 
     RDD的性質和操做方式講完了,如今來講說怎麼建立RDD,有兩種方式
     1.讀取一個外部數據集
     2.在內存中對一個集合進行並行化(parallelize)
 
     第二種方式相對來講更簡單,你能夠直接在shell裏快速建立RDD,舉個例子:
1 A = [1,2,3,4,5] 2 lines = sc.parallelize(A) 3 #另外一種方式
4 lines = sc.parallelize([1,2,3,4,5])
  
  可是這種方式並非很好,由於你須要把你的整個數據集放在內存裏,若是數據量比較大,會很佔內存。因此,能夠在測試的時候用這種方式,簡單快速。
     
  讀取外部數據及時須要用到SparkContext.textFile()
 
 1 lines = sc.textFile("README.md") 
 
  RDD的操做命令不少,包括map(),filter()等Transformation操做以及reduce(),fold(),aggregate()等Action操做。
  • 常見的Transformation操做:

   map( )和flatMap( )的聯繫和區別 緩存

map( ):接收一個函數,應用到RDD中的每一個元素,而後爲每一條輸入返回一個對象。

filter( ):接收一個函數,將函數的元素放入新的RDD中返回。

flatMap( ):接收一個函數,應用到RDD中的每一個元素,返回一個包含可迭代的類型(如list等)的RDD,能夠理解爲先Map(),後flat().

 

  
  用一個圖能夠很清楚的理解:
  
 
 
  僞集合操做:
 
1 distinct( )、union( )、intersection( )、subtract( )
2 distinct( ):去重
3 union( ):兩個RDD的並集
4 intersection( ):兩個RDD的交集
5 subtract( ):兩個RDD的補集
6 cartesian( ):兩個RDD的笛卡爾積(能夠應用於計算類似度中,如計算各用戶對各類產品的預期興趣程度)

注:
 
    
1.intersection( )的性能比union( )差不少,由於它須要數據混洗來發現共同數據
 
    
2.substract( )也須要數據混洗

 

  • 常見的Action操做:

  

1 reduce( ):接收一個函數做爲參數,這個函數要操做兩個相同元素類型的RDD,也返回一個一樣類型的RDD,能夠計算RDD中元素的和、個數、以及其餘聚合類型的操做。
2 
3 fold( ):和reduce同樣,但須要提供初始值。
4 
5 aggregate( ):和fold相似,但一般返回不一樣類型的函數。
6 
7 注:
關於fold()和aggregate(),再說點題外話。fold()只能作同構聚合操做,就是說,若是你有一個RDD[X],經過fold,你只能構造出一個X。可是若是你想經過RDD[X]構造一個Y呢?那就得用到aggregate()了,使用aggregate時,須要提供初始值(初始值的類型與最終返回的類型相同),而後經過一個函數把一RDD的元素合併起來放到累加器裏,再提供一個函數將累加器兩兩相加。由此能夠看出,fold()須要保證滅個partition可以獨立進行運算,而aggregate()對於不一樣partition(分區)提交的最終結果專門定義了一個函數來進行處理。

 

 

  RDD還有不少其餘的操做命令,譬如collect(),count(),take(),top(),countByValue(),foreach()等,限於篇幅,就不一一表述了。分佈式

 

  最後來說講如何Spark傳遞函數
  兩種方式:
  1.簡單的函數:lambda表達式
     適合比較短的函數,不支持多語句函數和無返回值的語句。
  2.def函數
     會將整個對象傳遞過去,可是最好不要傳遞一個帶字段引用的函數。若是你傳遞的對象是某個對象的成員,或者在某個函數中引用了一個整個字段,會報錯。舉個例子:
1 class MyClass(object): 2     def __init__(self): 3         self.field = 「Hello」 4 
5     def doStuff(self, rdd): 6         #報錯:由於在self.field中引用了整個self
7         return rdd.map(lambda s: self.field + x)

 

 解決方法:直接把你須要的字段拿出來放到一個局部變量裏,而後傳遞這個局部變量就能夠了。函數

 

1 class MyClass(object): 2     def __init__(self): 3         self.field = 「Hello」 4 
5     def doStuff(self, rdd): 6         #將須要的字段提取到局部變量中便可
7         field = self.field 8         return rdd.map(lambda s: field + x)

  

  前面三章講了Spark的基本概念和RDD的特性以及一些簡單的命令,比較簡單。後面三章主要講了鍵值對操做、數據的讀取和保存以及累加器、廣播變量等,下週再更新。性能

相關文章
相關標籤/搜索