本文始發於我的公衆號:TechFlow,原創不易,求個關注web
今天是spark第三篇文章,咱們繼續來看RDD的一些操做。數組
咱們前文說道在spark當中RDD的操做能夠分爲兩種,一種是轉化操做(transformation),另外一種是行動操做(action)。在轉化操做當中,spark不會爲咱們計算結果,而是會生成一個新的RDD節點,記錄下這個操做。只有在行動操做執行的時候,spark纔會從頭開始計算整個計算。緩存
而轉化操做又能夠進一步分爲針對元素的轉化操做以及針對集合的轉化操做。框架
針對元素的轉化操做很是經常使用,其中最經常使用的就是map和flatmap。從名字上看這二者都是map操做,map操做咱們都知道,在以前的MapReduce文章以及Python map、reduce用法的文章當中都有說起。簡而言之就是能夠將一個操做映射在每個元素上。編輯器
好比假設咱們有一個序列[1, 3, 4, 7],咱們但願將當中每個元素執行平方操做。咱們固然能夠用for循環執行,可是在spark當中更好的辦法是使用map。分佈式
nums = sc.parallelize([1, 3, 4, 7])
spuare = nums.map(lambda x: x * x)
咱們知道map是一個轉化操做,因此square仍然是一個RDD,咱們直接將它輸出不會獲得結果,只會獲得RDD的相關信息:函數
內部RDD的轉化圖是這樣的:性能
咱們想看結果就必需要執行行動操做,好比take,咱們take一下查看一下結果:學習
和咱們的預期一致,對於以前一直關注的同窗來講map操做應該已經很熟悉了,那麼這個flatmap又是什麼呢?優化
差異就在這個flat,咱們都知道flat是扁平的意思,因此flatmap就是說map執行以後的結果扁平化。說白了也就是說若是map執行以後的結果是一個數組的話,那麼會將數組拆開,把裏面的內容拿出來組合到一塊兒。
咱們一塊兒來看一個例子:
texts = sc.parallelize(['now test', 'spark rdd'])
split = texts.map(lambda x: x.split(' '))
因爲咱們執行map的對象是一個字符串,一個字符串執行split操做以後會獲得一個字符串數組。若是咱們執行map,獲得的結果會是:
若是咱們執行flatmap呢?咱們也能夠試一下:
對比一下,有沒有注意到差異?
是了,map執行的結果是一個array的array,由於每個string split以後就是一個array,咱們把array拼接到一塊兒天然是一個array的array。而flatMap會把這些array攤平以後放在一塊兒,這也是二者最大的差異。
上面介紹了針對元素的轉化操做,下面來看看針對集合的轉化操做。
針對集合的操做大概有union,distinct,intersection和subtract這幾種。咱們能夠先看下下圖有一個直觀地感覺,以後咱們再一一分析:
首先來看distinct,這個顧名思義,就是去除重複。和SQL當中的distinct是同樣的,這個操做的輸入是兩個集合RDD,執行以後會生成一個新的RDD,這個RDD當中的全部元素都是unique的。有一點須要注意,執行distinct的開銷很大,由於它會執行shuffle操做將全部的數據進行亂序,以確保每一個元素只有一份。若是你不明白shuffle操做是什麼意思,沒有關係,咱們在後序的文章當中會着重講解。只須要記住它的開銷很大就好了。
第二種操做是union,這個也很好理解,就是把兩個RDD當中的全部元素合併。你能夠把它當成是Python list當中的extend操做,一樣和extend同樣,它並不會作重複元素的檢測,因此若是合併的兩個集合當中有相同的元素並不會被過濾,而是會被保留。
第三個操做是intersection,它的意思是交集,也就是兩個集合重疊的部分。這個應該蠻好理解的,咱們看下下圖:
下圖當中藍色的部分,也就是A和B兩個集合的交集部分就是A.intersection(B)的結果,也就是兩個集合當中共有的元素。一樣,這個操做也會執行shuffle,因此開銷同樣很大,而且這個操做會去掉重複的元素。
最後一個是subtract,也就是差集,就是屬於A不屬於B的元素,一樣咱們能夠用圖來表示:
上圖當中灰色陰影部分就是A和B兩個集合的差集,一樣,這個操做也會執行shuffle,很是耗時。
除了以上幾種以外,還有cartesian,即笛卡爾積,sample抽樣等集合操做,不過相對而言用的稍微少一些,這裏就不過多介紹了,感興趣的同窗能夠了解一下,也並不複雜。
RDD中最經常使用的行動操做應該就是獲取結果的操做了,畢竟咱們算了半天就是爲了拿結果,只獲取RDD顯然不是咱們的目的。獲取結果的RDD主要是take,top和collect,這三種沒什麼特別的用法,簡單介紹一下。
其中collect是獲取全部結果,會返回全部的元素。take和top都須要傳入一個參數指定條數,take是從RDD中返回指定條數的結果,top是從RDD中返回最前面的若干條結果,top和take的用法徹底同樣,惟一的區別就是拿到的結果是不是最前面的。
除了這幾個以外,還有一個很經常使用的action是count,這個應該也不用多說,計算數據條數的操做,count一下就能夠知道有多少條數據了。
除了這些比較簡單的以外,再介紹另外兩個比較有意思的,首先,先來介紹reduce。reduce顧名思義就是MapReduce當中的reduce,它的用法和Python當中的reduce幾乎徹底同樣,它接受一個函數來進行合併操做。咱們來看個例子:
在這個例子當中,咱們的reduce函數是將兩個int執行加和,reduce機制會重複執行這個操做將全部的數據合併,因此最終獲得的結果就是1 + 3 + 4 + 7 = 15.
除了reduce以外還有一個叫作fold的action,它和reduce徹底同樣,惟一不一樣的是它能夠自定義一個初始值,而且是針對分區的,咱們還拿上面的例子舉例:
直接看這個例子可能有點懵逼,簡單解釋一下就明白了,其實不復雜。咱們注意到咱們在使用parallelize創造數據的時候多加了一個參數2,這個2表示分區數。簡單能夠理解成數組[1, 3, 4, 7]會被分紅兩部分,可是咱們直接collect的話仍是原值。
如今咱們使用fold,傳入了兩個參數,除了一個函數以外還傳入了一個初始值2。因此整個計算過程是這樣的:
對於第一個分區的答案是1 + 3 + 2 = 6,對於第二個分區的答案是4 + 7 + 2 = 13,最後將兩個分區合併:6 + 13 + 2 = 21。
也就是說咱們對於每一個分區的結果賦予了一個起始值,而且對分區合併以後的結果又賦予了一個起始值。
老實講這個action是最難理解的,由於它比較反常。首先,對於reduce和fold來講都有一個要求就是返回值的類型必須和rdd的數據類型相同。好比數據的類型是int,那麼返回的結果也要是int。
可是對於有些場景這個是不適用的,好比咱們想求平均,咱們須要知道term的和,也須要知道term出現的次數,因此咱們須要返回兩個值。這個時候咱們初始化的值應該是0, 0,也就是對於加和與計數而言都是從0開始的,接着咱們須要傳入兩個函數,好比寫成這樣:
nums.aggregate((0, 0), lambda x, y: (x[0] + y, x[1] + 1), lambda x, y: (x[0] + y[0], x[1] + y[1]))
看到這行代碼會懵逼是必然的,不用擔憂,咱們一點一點解釋。
首先是第一個lambda函數,這裏的x不是一個值而是兩個值,或者說是一個二元組,也就是咱們最後返回的結果,在咱們的返回預期裏,第一個返回的數是nums的和,第二個返回的數是nums當中數的個數。而這裏的y則是nums輸入的結果,顯然nums輸入的結果只有一個int,因此這裏的y是一維的。那麼咱們要求和固然是用x[0] + y,也就是說把y的值加在第一維上,第二維天然是加一,由於咱們每讀取一個數就應該加一。
這點還比較容易理解,第二個函數可能有些費勁,第二個函數和第一個不一樣,它不是用在處理nums的數據的,而是用來處理分區的。當咱們執行aggregate的時候,spark並非單線程執行的,它會將nums中的數據拆分紅許多分區,每一個分區獲得結果以後須要合併,合併的時候會調用這個函數。
和第一個函數相似,第一個x是最終結果,而y則是其餘分區運算結束須要合併進來的值。因此這裏的y是二維的,第一維是某個分區的和,第二維是某個分區當中元素的數量,那麼咱們固然要把它都加在x上。
上圖展現了兩個分區的時候的計算過程,其中lambda1就是咱們傳入的第一個匿名函數,同理,lambda2就是咱們傳入的第二個匿名函數。我想結合圖應該很容易看明白。
行動操做除了這幾個以外還有一些,因爲篇幅緣由咱們先不贅述了,在後序的文章當中若是有出現,咱們會再進行詳細解釋的。初學者學習spark比較抗拒的一個主要緣由就是以爲太過複雜,就連操做還區分什麼轉化操做和行動操做。其實這一切都是爲了惰性求值從而優化性能。這樣咱們就能夠把若干個操做合併在一塊兒執行,從而減小消耗的計算資源,對於分佈式計算框架而言,性能是很是重要的指標,理解了這一點,spark爲何會作出這樣的設計也就很容易理解了。
不只spark如此,TensorFlow等深度學習框架也是如此,本質上許多看似反直覺的設計都是有更深層的緣由的,理解了以後其實也很容易猜到,凡是拿到最終結果的操做每每都是行動操做,若是隻是一些計算,那麼十有八九是轉化操做。
Spark當中的RDD是惰性求值的,有的時候咱們會但願屢次使用同一個RDD。若是咱們只是簡單地調用行動操做,那麼spark會屢次重複計算RDD和它對應的全部數據以及其餘依賴,這顯然會帶來大量開銷。咱們很天然地會但願對於咱們常用的RDD能夠緩存起來,在咱們須要的時候隨時拿來用,而不是每次用到的時候都須要從新跑。
爲了解決這個問題,spark當中提供了持久化的操做。所謂的持久化能夠簡單理解成緩存起來。用法也很簡單,咱們只須要對RDD進行persist便可:
texts = sc.parallelize(['now test', 'hello world'])
split = texts.split(lambda x: x.split(' '))
split.persist()
調用完持久化以後,RDD會被緩存進內存或磁盤當中,咱們須要的時候能夠隨時調出來使用,就不用把前面的整個流程所有跑一遍了。而且spark當中支持多種級別的持久化操做,咱們能夠經過StorageLevel的變量來控制。咱們來看下這個StorageLevel的取值:
咱們根據須要選擇對應的緩存級別便可。固然既然有持久化天然就有反持久化,對於一些已經再也不須要緩存的RDD,咱們能夠調用unpersist將它們從緩存當中去除。
今天的內容雖然看起來各類操做五花八門,可是有些並非常常用到,咱們只須要大概有個印象,具體操做的細節能夠等用到的時候再作仔細的研究。但願你們都能忽略這些並不重要的細節,抓住核心的本質。
今天的文章就是這些,若是以爲有所收穫,請順手點個關注或者轉發吧,大家的舉手之勞對我來講很重要。