spark中的pair rdd,看這一篇就夠了

本文始發於我的公衆號:TechFlow,原創不易,求個關注web


今天是spark專題的第四篇文章,咱們一塊兒來看下Pair RDD。api

定義

在以前的文章當中,咱們已經熟悉了RDD的相關概念,也瞭解了RDD基本的轉化操做和行動操做。今天咱們來看一下RDD當中很是常見的PairRDD,也叫作鍵值對RDD,能夠理解成KVRDD。數組

KV很好理解,就是key和value的組合,好比Python當中的dict或者是C++以及Java當中的map中的基本元素都是鍵值對。相比於以前基本的RDD,pariRDD能夠支持更多的操做,相對來講更加靈活,能夠完成更加複雜的功能。好比咱們能夠根據key進行聚合,或者是計算交集等。網絡

因此自己pairRDD只不過是數據類型是KV結構的RDD而已,並無太多的內涵,你們不須要擔憂。app

Pair RDD轉化操做

Pair RDD也是RDD,因此以前介紹的RDD的轉化操做Pair RDD天然也可使用。它們二者有些像是類繼承的關係,RDD是父類,Pair RDD是實現了一些新特性的子類。子類能夠調用父類當中全部的方法,可是父類卻不能調用子類中的方法。編輯器

調用的時候須要注意,因爲咱們的Pair RDD中的數據格式是KV的二元組,因此咱們傳入的函數必須是針對二元組數據的,否則的話可能運算的結果會有問題。下面咱們來列舉一些最經常使用的轉化操做。函數

爲了方便演示,咱們用一個固定的RDD來運行各類轉化操做,來直觀瞭解一下這些轉化操做究竟起什麼樣的做用。spa

ex1 = sc.parallelize([[12], [34], [35]])

keys,values和sortByKey

這三個轉化操做應該是最經常使用也是最簡單的,簡單到咱們經過字面意思就能夠猜出它們的意思。3d

咱們先來看keys和values:code

咱們的RDD當中二元組當中的第一個元素會被當作key,第二個元素當作value,須要注意的是,它並非一個map或者是dict,因此key和value都是能夠重複的

sortByKey也很直觀,咱們從字面意思就看得出來是對RDD當中的數據根據key值進行排序,一樣,咱們也來看下結果:

mapValues和flatMapValues

mapValues不能直接使用,而必需要傳入一個函數做爲參數。它的意思是對全部的value執行這個函數,好比咱們想把全部的value所有轉變成字符串,咱們能夠這麼操做:

flatMapValues的操做和咱們的認知有些相反,咱們都知道flatMap操做是能夠將一個嵌套的數組打散,可是咱們怎麼對一個value打散嵌套呢?畢竟咱們的value不必定就是一個數組,這就要說到咱們傳入的函數了,這個flatMap的操做實際上是針對函數返回的結果的,也就是說函數會返回一個迭代器,而後打散的內容實際上是這個迭代器當中的值。

我這麼表述可能有些枯燥,咱們來看一個例子就明白了:

不知道這個結果有沒有出乎你們的意料,它的整個流程是這樣的,咱們調用flatMapValues運算以後返回一個迭代器,迭代器的內容是range(x, x+3)。實際上是每個key對應一個這樣的迭代器,以後再將迭代器當中的內容打散,和key構成新的pair。

groupByKey,reduceByKey和foldByKey

這兩個功能也比較接近,咱們先說第一個,若是學過SQL的同窗對於group by操做的含義應該很是熟悉。若是沒有了解過也沒有關係,group by能夠簡單理解成歸併或者是分桶。也就是說將key值相同的value歸併到一塊兒,獲得的結果是key-list的Pair RDD,也就是咱們把key值相同的value放在了一個list當中。

咱們也來看下例子:

咱們調用完groupby以後獲得的結果是一個對象,因此須要調用一下mapValues將它轉成list纔可使用,不然的話是不能使用collect獲取的。

reduceByKey和groupByKey相似,只不過groupByKey只是歸併到一塊兒,然而reduceByKey是傳入reduce函數,執行reduce以後的結果。咱們來看一個例子:

在這個例子當中咱們執行了累加,把key值相同的value加在了一塊兒。

foldByKey和fold的用法差異並不大,惟一不一樣的是咱們加上了根據key值聚合的邏輯。若是咱們把分區的初始值設置成0的話,那麼它用起來和reduceByKey幾乎沒有區別:

咱們只須要清楚foldByKey當中的初始值針對的是分區便可。

combineByKey

這個也是一個很核心而且不太容易理解的轉化操做,咱們先來看它的參數,它一共接受5個參數。咱們一個一個來講,首先是第一個參數,是createCombiner

它的做用是初始化,將value根據咱們的須要作初始化,好比將string類型的轉化成int,或者是其餘的操做。咱們用記號能夠寫成是V => C,這裏的V就是value,C是咱們初始化以後的新值。

它會和value一塊兒被當成新的pair傳入第二個函數,因此第二個函數的接受參數是(C, V)的二元組。咱們要作的是定義這個二元組的合併,因此第二個函數能夠寫成(C, V) => C。源碼裏的註釋和網上的教程都是這麼寫的,但我以爲因爲出現了兩個C,可能會讓人難以理解,我以爲能夠寫成(C, V) => D,比較好。

最後一個函數是將D進行合併,因此它能夠寫成是(D, D) => D。

到這裏咱們看似好像明白了它的原理,可是又好像有不少問號,總以爲哪裏有些不太對勁。我想了好久,才找到了問題的根源,出在哪裏呢,在於合併。有沒有發現第二個函數和第三個函數都是用來合併的,爲何咱們要合併兩次,它們之間的區別是什麼?若是這個問題沒搞明白,那麼對於它的使用必定是錯誤的,我我的以爲這個問題纔是這個轉化操做的核心,沒講清楚這個問題的博客都是不夠清楚的。

其實這兩次合併的邏輯大同小異,可是合併的範圍不同,第一次合併是針對分區的,第二次合併是針對key的。由於在spark當中數據可能不止存放在一個分區內,因此咱們要合併兩次,第一次先將分區內部的數據整合在一塊兒,第二次再跨分區合併。因爲不一樣分區的數據可能相隔很遠,因此會致使網絡傳輸的時間過長,因此咱們但願傳輸的數據儘可能小,這纔有了groupby兩次的緣由。

咱們再來看一個例子:

在這個例子當中咱們計算了每一個單詞出現的平均個數,咱們一點一點來看。首先,咱們第一個函數將value轉化成了(1, value)的元組,元組的第0號元素表示出現該單詞的文檔數,第1號元素表示文檔內出現的次數。因此第二個函數,也就是在分組內聚合的函數,咱們對於出現的文檔數只須要加一便可,對於出現的次數要進行累加。由於這一次聚合的對象都是(1, value)類型的元素,也就是沒有聚合以前的結果。

在第三個函數當中,咱們對於出現的總數也進行累加,是由於這一個函數處理的結果是各個分區已經聚合一次的結果了。好比apple在一個分區內出如今了兩個文檔內,一共出現了20次,在一個分區出如今了三個文檔中,一共出現了30次,那麼顯然咱們一共出如今了5個文檔中,一共出現了50次。

因爲咱們要計算平均,因此咱們要用出現的總次數除以出現的文檔數。最後通過map以後因爲咱們獲得的仍是一個二元組,咱們不能直接collect,須要用collectAsMap。

咱們把上面這個例子用圖來展現,會很容易理解:

鏈接操做

在spark當中,除了基礎的轉化操做以外,spark還提供了額外的鏈接操做給pair RDD。經過鏈接,咱們能夠很方便地像是操做集合同樣操做RDD。操做的方法也很是簡單,和SQL當中操做數據表的形式很像,就是join操做。join操做又能夠分爲join(inner join)、left join和right join。

若是你熟悉SQL的話,想必這三者的區別應該很是清楚,它和SQL當中的join是同樣的。若是不熟悉也沒有關係,解釋起來並不複雜。在join的時候咱們每每是用一張表去join另一張表,就好像兩個數相減,咱們用一個數減去另一個數同樣。好比A.join(B),咱們把A叫作左表,B叫作右表。所謂的join,就是把兩張表當中某一個字段或者是某些字段值相同的行鏈接在一塊兒。

好比一張表是學生表,一張表是出勤表。咱們兩張表用學生的id一關聯,就獲得了學生的出勤記錄。可是既然是集合關聯,就會出現數據關聯不上的狀況。好比某個學生沒有出勤,或者是出勤表裏記錯了學生id。對於數據關聯不上的狀況,咱們的處理方式有四種。第一種是全都丟棄,關聯不上的數據就不要了。第二種是所有保留,關聯不上的字段就記爲NULL。第三種是左表關聯不上的保留,右表丟棄。第四種是右表保留,左表丟棄。

下圖展現了這四種join,很是形象。

咱們看幾個實際的例子來體會一下。

首先建立數據集:

ex1 = sc.parallelize([['frank'30], ['bob'9], ['silly'3]])
ex2 = sc.parallelize([['frank'80], ['bob'12], ['marry'22], ['frank'21], ['bob'22]])

接着,咱們分別運行這四種join,觀察一下join以後的結果。

從結果當中咱們能夠看到,若是兩個數據集當中都存在多條key值相同的數據,spark會將它們兩兩相乘匹配在一塊兒。

行動操做

最後,咱們看下pair RDD的行動操做。pair RDD一樣是rdd,因此普通rdd適用的行動操做,一樣適用於pair rdd。可是除此以外,spark還爲它開發了獨有的行動操做。

countByKey

countByKey這個操做顧名思義就是根據Key值計算每一個Key值出現的條數,它等價於count groupby的SQL語句。咱們來看個具體的例子:

collectAsMap

這個也很好理解,其實就是講最後的結果以map的形式輸出

從返回的結果能夠看到,輸出的是一個dict類型。也就是Python當中的"map"。

lookup

這個單詞看起來比較少見,其實它表明的是根據key值查找對應的value的意思。也就是經常使用的get函數,咱們傳入一個key值,會自動返回key值對應的全部的value。若是有多個value,則會返回list。

總結

到這裏,全部的pair RDD相關的操做就算是介紹完了。pair rdd在咱們平常的使用當中出現的頻率很是高,利用它能夠很是方便地實現一些比較複雜的操做。

另外,今天的這篇文章內容很多,想要徹底吃透,須要一點功夫。這不是看一篇文章就能夠實現的,可是也沒有關係,咱們初學的時候只須要對這些api和使用方法有一個大概的印象便可,具體的使用細節能夠等用到的時候再去查閱相關的資料。

今天的文章就是這些,若是以爲有所收穫,請順手點個關注或者轉發吧,大家的舉手之勞對我來講很重要。

相關文章
相關標籤/搜索