需求
假設咱們有一張各個產品線URL的訪問記錄表,該表僅僅有兩個字段:product、url,咱們須要統計各個產品線下訪問次數前10的URL是哪些?
解決方案
(1)模擬訪問記錄數據
模擬數據記錄共有1000條,其中包括10個產品線:product一、product二、…、product10,100個URL:url一、url二、…、url100,爲了簡化生成數據的過程,產品線和URL均使用了隨機數。一條記錄爲一個字符串,產品線與URL使用空格進行分隔。模擬數據存儲在一個名爲「data」的列表中,經過parallelize的方式造成一個RDD:table,再使用inferSchema的方式註冊爲一張臨時表「product_url」。
表「product_url」的示例數據以下:
(2)統計各個產品線下各個URL的訪問次數
這個邏輯使用Spark SQL便可以實現,示例數據以下:
能夠看出,數據多出了一個字段access,用於表示某產品線下某個URL的訪問次數。
此外,若是咱們有一個數據類型爲Row的變量row,能夠經過row.product、row.url、row.access或者row[0]、row[1]、row[2]訪問product、url、access對應的數據。
(3)「分區排序取值」
咱們的統計需求有一個明顯的分界線:產品線,Top N的處理邏輯能夠轉變爲:
a. 根據分界線作分區,即每個產品線的記錄進入同一個分區;
b. 每個分區(產品線)內根據URL的訪問次數(access)排序(降序);
c. 每個分區(產品線)內取前N條數據便可。
a、b實際就是一個「分區排序」的過程,Spark RDD也爲「分區排序」提供了很是方便的API:repartitionAndSortWithinPartitions,可是該函數須要傳入的數據類型要求爲(key, value),所以咱們須要對(2)中的數據作一下簡單的處理:
其實就是將數據類型Row映射爲元組(Row, None),示例數據以下:
此外repartitionAndSortWithinPartitions還須要兩個函數:partitionFunc、keyFunc,這兩個函數都須要接收一個參數,該參數爲(key, value)中的key。
partitionFunc用於根據(key, value)中的key如何選取分區,返回值要求爲整型,數值即爲分區號,即0表示分區0,1表示分區1,…。
這裏key的數據類型即爲Row。
由於咱們模擬了10個產品線,每個產品線的數據須要被劃分到同一個分區內,所以咱們也須要10個分區(分區序號爲0—9),根據產品線劃分分區的規則爲:產品線product1的分區爲0,產品線product2的分區爲1,…,產品線product10的分區爲9。其中key[0]爲產品線名稱,key[0][7]爲產品線名稱中的隨機數,將key[0][7]轉換爲整數並減一便可獲得對應的分區號。
keyFunc用於根據(key, value)中的key如何排序,「分區內排序」時即根據該函數的返回值進行排序。
其中,key[2]爲訪問次數access,咱們就是須要在某個分區(產品線)內根據URL的訪問次數作排序。
函數partitionFunc、keyFunc準備好以後,咱們能夠開始調用repartitionAndSortWithinPartitions:
須要注意的是,numPartitions值爲10,該值取決於分區(產品線)的個數;ascending值爲False,該值表示分區內排序時使用降序。
「分區排序」以後咱們便可以開始「取值」,取值的過程比較簡單:在每一個分區內即前N(這裏假設爲10,即top 10)個值,將這些值「彙總」以後便可得出各個產品線下URL訪問次數的Top 10。
考慮到咱們須要「彙總」的需求,所以不能使用foreachPartition,須要經過mapPartitions實現,它須要一個函數:f,函數f的參數爲一個「迭代器」,經過這個「迭代器」能夠遍歷分區內的全部數據。
從上面的代碼能夠看出,咱們就是經過「迭代器」iter獲取分區內的前10條數據(若是分區內的數據條數大於或等於10的話)。
「彙總」(collect)結果:
rows中保存着各個產品線下URL訪問次數的Top 10記錄。
(4)結果處理
計算完成以後,咱們能夠對結果進行一些處理,如:根據產品線、URL根據字典序排序並輸出,代碼以下:
示例數據:
總結
使用Spark解決Top N問題時,只須要通過「劃分分區、分區內排序、分區內取值」三個過程便可完成。