MapReduce 模式、算法和用例

本文另外一地址請見  MapReduce 模式、算法和用例 html

本文譯自 Mapreduce Patterns, Algorithms, and Use Cases node


在這篇文章裏總結了幾種網上或者論文中常見的MapReduce模式和算法,並系統化的解釋了這些技術的不一樣之處。全部描述性的文字和代碼都使用了標準hadoop的MapReduce模型,包括Mappers, Reduces, Combiners, Partitioners,和 sorting。以下圖所示。 算法

基本MapReduce模式

計數與求和

問題陳述:  數組

有許多文檔,每一個文檔都有一些字段組成。須要計算出每一個字段在全部文檔中的出現次數或者這些字段的其餘什麼統計值。例如,給定一個log文件,其中的每條記錄都包含一個響應時間,須要計算出平均響應時間。 緩存

解決方案: 網絡

讓咱們先從簡單的例子入手。在下面的代碼片斷裏,Mapper每遇到指定詞就把頻次記1,Reducer一個個遍歷這些詞的集合而後把他們的頻次加和。 app

  
1 class Mapper 2 method Map(docid id, doc d) 3 for all term t in doc d do 4 Emit(term t, count 1 ) 5 6 class Reducer 7 method Reduce(term t, counts [c1, c2,...]) 8 sum = 0 9 for all count c in [c1, c2,...] do 10 sum = sum + c 11 Emit(term t, count sum)

這種方法的缺點顯而易見,Mapper提交了太多無心義的計數。它徹底能夠經過先對每一個文檔中的詞進行計數從而減小傳遞給Reducer的數據量: 框架

  
1 class Mapper 2 method Map(docid id, doc d) 3 H = new AssociativeArray 4 for all term t in doc d do 5 H{t} = H{t} + 1 6 for all term t in H do 7 Emit(term t, count H{t})

若是要累計計數的的不僅是單個文檔中的內容,還包括了一個Mapper節點處理的全部文檔,那就要用到Combiner了: 機器學習

  
1 class Mapper 2 method Map(docid id, doc d) 3 for all term t in doc d do 4 Emit(term t, count 1 ) 5 6 class Combiner 7 method Combine(term t, [c1, c2,...]) 8 sum = 0 9 for all count c in [c1, c2,...] do 10 sum = sum + c 11 Emit(term t, count sum) 12 13 class Reducer 14 method Reduce(term t, counts [c1, c2,...]) 15 sum = 0 16 for all count c in [c1, c2,...] do 17 sum = sum + c 18 Emit(term t, count sum)
應用:Log 分析, 數據查詢

整理歸類

問題陳述: 分佈式

有一系列條目,每一個條目都有幾個屬性,要把具備同一屬性值的條目都保存在一個文件裏,或者把條目按照屬性值分組。 最典型的應用是倒排索引。

解決方案:

解決方案很簡單。 在 Mapper 中以每一個條目的所需屬性值做爲 key,其自己做爲值傳遞給 Reducer。 Reducer 取得按照屬性值分組的條目,而後能夠處理或者保存。若是是在構建倒排索引,那麼 每一個條目至關於一個詞而屬性值就是詞所在的文檔ID。

應用:倒排索引, ETL

過濾 (文本查找),解析和校驗

問題陳述:

假設有不少條記錄,須要從其中找出知足某個條件的全部記錄,或者將每條記錄傳換成另一種形式(轉換操做相對於各條記錄獨立,即對一條記錄的操做與其餘記錄無關)。像文本解析、特定值抽取、格式轉換等都屬於後一種用例。

解決方案:

很是簡單,在Mapper 裏逐條進行操做,輸出須要的值或轉換後的形式。

應用:日誌分析,數據查詢,ETL,數據校驗

分佈式任務執行

問題陳述:

大型計算能夠分解爲多個部分分別進行而後合併各個計算的結果以得到最終結果。

解決方案:  將數據切分紅多份做爲每一個 Mapper 的輸入,每一個Mapper處理一份數據,執行一樣的運算,產生結果,Reducer把多個Mapper的結果組合成一個。

案例研究: 數字通訊系統模擬

像 WiMAX 這樣的數字通訊模擬軟件經過系統模型來傳輸大量的隨機數據,而後計算傳輸中的錯誤概率。 每一個 Mapper 處理樣本 1/N  的數據,計算出這部分數據的錯誤率,而後在 Reducer 裏計算平均錯誤率。

應用:工程模擬,數字分析,性能測試

排序

問題陳述:

有許多條記錄,須要按照某種規則將全部記錄排序或是按照順序來處理記錄。

解決方案: 簡單排序很好辦 – Mappers 將待排序的屬性值爲鍵,整條記錄爲值輸出。 不過實際應用中的排序要更加巧妙一點, 這就是它之因此被稱爲MapReduce 核心的緣由(「核心」是說排序?由於證實Hadoop計算能力的實驗是大數據排序?仍是說Hadoop的處理過程當中對key排序的環節?)。在實踐中,經常使用組合鍵來實現二次排序和分組。

MapReduce 最初只可以對鍵排序, 可是也有技術利用能夠利用Hadoop 的特性來實現按值排序。想了解的話能夠看 這篇博客

按照BigTable的概念,使用 MapReduce來對最初數據而非中間數據排序,也即保持數據的有序狀態更有好處,必須注意這一點。換句話說,在數據插入時排序一次要比在每次查詢數據的時候排序更高效。

應用:ETL,數據分析

非基本 MapReduce 模式

迭代消息傳遞 (圖處理)

問題陳述:

假設一個實體網絡,實體之間存在着關係。 須要按照與它比鄰的其餘實體的屬性計算出一個狀態。這個狀態能夠表現爲它和其它節點之間的距離, 存在特定屬性的鄰接點的跡象, 鄰域密度特徵等等。

解決方案:

網絡存儲爲系列節點的結合,每一個節點包含有其全部鄰接點ID的列表。按照這個概念,MapReduce 迭代進行,每次迭代中每一個節點都發消息給它的鄰接點。鄰接點根據接收到的信息更新本身的狀態。當知足了某些條件的時候迭代中止,如達到了最大迭代次數(網絡半徑)或兩次連續的迭代幾乎沒有狀態改變。從技術上來看,Mapper 以每一個鄰接點的ID爲鍵發出信息,全部的信息都會按照接受節點分組,reducer 就可以重算各節點的狀態而後更新那些狀態改變了的節點。下面展現了這個算法:

  
1 class Mapper 2 method Map(id n, object N) 3 Emit(id n, object N) 4 for all id m in N.OutgoingRelations do 5 Emit(id m, message getMessage(N)) 6 7 class Reducer 8 method Reduce(id m, [s1, s2,...]) 9 M = null 10 messages = [] 11 for all s in [s1, s2,...] do 12 if IsObject(s) then 13 M = s 14 else // s is a message 15 messages.add(s) 16 M.State = calculateState(messages) 17 Emit(id m, item M)

一個節點的狀態能夠迅速的沿着網絡傳遍全網,那些被感染了的節點又去感染它們的鄰居,整個過程就像下面的圖示同樣:

案例研究: 沿分類樹的有效性傳遞

問題陳述:

這個問題來自於真實的電子商務應用。將各類貨物分類,這些類別能夠組成一個樹形結構,比較大的分類(像男人、女人、兒童)能夠再分出小分類(像男褲或女裝),直到不能再分爲止(像男式藍色牛仔褲)。這些不能再分的基層類別能夠是有效(這個類別包含有貨品)或者已無效的(沒有屬於這個分類的貨品)。若是一個分類至少含有一個有效的子分類那麼認爲這個分類也是有效的。咱們須要在已知一些基層分類有效的狀況下找出分類樹上全部有效的分類。

解決方案:

這個問題能夠用上一節提到的框架來解決。咱們咋下面定義了名爲 getMessage和 calculateState 的方法:

  
1 class N 2 State in {True = 2 , False = 1 , null = 0 }, 3 initialized 1 or 2 for end - of - line categories, 0 otherwise 4 method getMessage( object N) 5 return N.State 6 method calculateState(state s, data [d1, d2,...]) 7 return max( [d1, d2,...] )

案例研究:廣度優先搜索

問題陳述須要計算出一個圖結構中某一個節點到其它全部節點的距離。

解決方案: Source源節點給全部鄰接點發出值爲0的信號,鄰接點把收到的信號再轉發給本身的鄰接點,每轉發一次就對信號值加1:

  
1 class N 2 State is distance, 3 initialized 0 for source node, INFINITY for all other nodes 4 method getMessage(N) 5 return N.State + 1 6 method calculateState(state s, data [d1, d2,...]) 7 min( [d1, d2,...] )

案例研究:網頁排名和 Mapper 端數據聚合

這個算法由Google提出,使用權威的PageRank算法,經過鏈接到一個網頁的其餘網頁來計算網頁的相關性。真實算法是至關複雜的,可是核心思想是權重能夠傳播,也即經過一個節點的各聯接節點的權重的均值來計算節點自身的權重。

  
1 class N 2 State is PageRank 3 method getMessage( object N) 4 return N.State / N.OutgoingRelations.size() 5 method calculateState(state s, data [d1, d2,...]) 6 return ( sum([d1, d2,...]) )

要指出的是上面用一個數值來做爲評分其實是一種簡化,在實際狀況下,咱們須要在Mapper端來進行聚合計算得出這個值。下面的代碼片斷展現了這個改變後的邏輯 (針對於 PageRank 算法):

  
1 class Mapper 2 method Initialize 3 H = new AssociativeArray 4 method Map(id n, object N) 5 p = N.PageRank / N.OutgoingRelations.size() 6 Emit(id n, object N) 7 for all id m in N.OutgoingRelations do 8 H{m} = H{m} + p 9 method Close 10 for all id n in H do 11 Emit(id n, value H{n}) 12 13 class Reducer 14 method Reduce(id m, [s1, s2,...]) 15 M = null 16 p = 0 17 for all s in [s1, s2,...] do 18 if IsObject(s) then 19 M = s 20 else 21 p = p + s 22 M.PageRank = p 23 Emit(id m, item M)
應用:圖分析,網頁索引

 

值去重 (對惟一項計數)

問題陳述: 記錄包含值域F和值域 G,要分別統計相同G值的記錄中不一樣的F值的數目 (至關於按照 G分組).

這個問題能夠推而廣之應用於分面搜索(某些電子商務網站稱之爲Narrow Search)

Record 1: F=1, G={a, b}
  Record 2: F=2, G={a, d, e}
  Record 3: F=1, G={b}
  Record 4: F=3, G={a, b}

  Result:
  a -> 3 // F=1, F=2, F=3
  b -> 2 // F=1, F=3
  d -> 1 // F=2
  e -> 1 // F=2

解決方案 I:

第一種方法是分兩個階段來解決這個問題。第一階段在Mapper中使用F和G組成一個複合值對,而後在Reducer中輸出每一個值對,目的是爲了保證F值的惟一性。在第二階段,再將值對按照G值來分組計算每組中的條目數。

第一階段:

  
1 class Mapper 2 method Map( null , record [value f, categories [g1, g2,...]]) 3 for all category g in [g1, g2,...] 4 Emit(record [g, f], count 1 ) 5 6 class Reducer 7 method Reduce(record [g, f], counts [n1, n2, ...]) 8 Emit(record [g, f], null )

第二階段:

  
1 class Mapper 2 method Map(record [f, g], null ) 3 Emit(value g, count 1 ) 4 5 class Reducer 6 method Reduce(value g, counts [n1, n2,...]) 7 Emit(value g, sum( [n1, n2,...] ) )

解決方案 II:

第二種方法只須要一次MapReduce 便可實現,但擴展性不強。算法很簡單-Mapper 輸出值和分類,在Reducer裏爲每一個值對應的分類去重而後給每一個所屬的分類計數加1,最後再在Reducer結束後將全部計數加和。這種方法適用於只有有限個分類,並且擁有相同F值的記錄不是不少的狀況。例如網絡日誌處理和用戶分類,用戶的總數不少,可是每一個用戶的事件是有限的,以此分類獲得的類別也是有限的。值得一提的是在這種模式下能夠在數據傳輸到Reducer以前使用Combiner來去除分類的重複值。

  
1 class Mapper 2 method Map( null , record [value f, categories [g1, g2,...] ) 3 for all category g in [g1, g2,...] 4 Emit(value f, category g) 5 6 class Reducer 7 method Initialize 8 H = new AssociativeArray : category -> count 9 method Reduce(value f, categories [g1, g2,...]) 10 [g1 ' , g2 ' ,..] = ExcludeDuplicates( [g1, g2,..] ) 11 for all category g in [g1 ' , g2 ' ,...] 12 H{g} = H{g} + 1 13 method Close 14 for all category g in H do 15 Emit(category g, count H{g})
應用:日誌分析,用戶計數

互相關

問題陳述:有多個各由若干項構成的組,計算項兩兩共同出現於一個組中的次數。假如項數是N,那麼應該計算N*N。

這種狀況常見於文本分析(條目是單詞而元組是句子),市場分析(購買了此物的客戶還可能購買什麼)。若是N*N小到能夠容納於一臺機器的內存,實現起來就比較簡單了。

配對法

第一種方法是在Mapper中給全部條目配對,而後在Reducer中將同一條目對的計數加和。但這種作法也有缺點:

  • 使用 combiners 帶來的的好處有限,由於極可能全部項對都是惟一的
  • 不能有效利用內存
  
1 class Mapper 2 method Map( null , items [i1, i2,...] ) 3 for all item i in [i1, i2,...] 4 for all item j in [i1, i2,...] 5 Emit(pair [i j], count 1 ) 6 7 class Reducer 8 method Reduce(pair [i j], counts [c1, c2,...]) 9 s = sum([c1, c2,...]) 10 Emit(pair[i j], count s)

Stripes Approach(條方法?不知道這個名字怎麼理解)

第二種方法是將數據按照pair中的第一項來分組,並維護一個關聯數組,數組中存儲的是全部關聯項的計數。The second approach is to group data by the first item in pair and maintain an associative array (「stripe」) where counters for all adjacent items are accumulated. Reducer receives all stripes for leading item i, merges them, and emits the same result as in the Pairs approach.

  • 中間結果的鍵數量相對較少,所以減小了排序消耗。
  • 能夠有效利用 combiners。
  • 可在內存中執行,不過若是沒有正確執行的話也會帶來問題。
  • 實現起來比較複雜。
  • 通常來講, 「stripes」 比 「pairs」 更快
  
1 class Mapper 2 method Map( null , items [i1, i2,...] ) 3 for all item i in [i1, i2,...] 4 H = new AssociativeArray : item -> counter 5 for all item j in [i1, i2,...] 6 H{j} = H{j} + 1 7 Emit(item i, stripe H) 8 9 class Reducer 10 method Reduce(item i, stripes [H1, H2,...]) 11 H = new AssociativeArray : item -> counter 12 H = merge - sum( [H1, H2,...] ) 13 for all item j in H.keys() 14 Emit(pair [i j], H{j})
應用:文本分析,市場分析
參考資料:Lin J. Dyer C. Hirst G. Data Intensive Processing MapReduce

用MapReduce 表達關係模式

在這部分咱們會討論一下怎麼使用MapReduce來進行主要的關係操做。

篩選(Selection)

  
1 class Mapper 2 method Map(rowkey key, tuple t) 3 if t satisfies the predicate 4 Emit(tuple t, null )

投影(Projection)

投影只比篩選稍微複雜一點,在這種狀況下咱們能夠用Reducer來消除可能的重複值。

  
1 class Mapper 2 method Map(rowkey key, tuple t) 3 tuple g = project(t) // extract required fields to tuple g 4 Emit(tuple g, null ) 5 6 class Reducer 7 method Reduce(tuple t, array n) // n is an array of nulls 8 Emit(tuple t, null )

合併(Union)

兩個數據集中的全部記錄都送入Mapper,在Reducer裏消重。

  
1 class Mapper 2 method Map(rowkey key, tuple t) 3 Emit(tuple t, null ) 4 5 class Reducer 6 method Reduce(tuple t, array n) // n is an array of one or two nulls 7 Emit(tuple t, null )

交集(Intersection)

將兩個數據集中須要作交叉的記錄輸入Mapper,Reducer 輸出出現了兩次的記錄。由於每條記錄都有一個主鍵,在每一個數據集中只會出現一次,因此這樣作是可行的。

  
1 class Mapper 2 method Map(rowkey key, tuple t) 3 Emit(tuple t, null ) 4 5 class Reducer 6 method Reduce(tuple t, array n) // n is an array of one or two nulls 7 if n.size() = 2 8 Emit(tuple t, null )

差別(Difference)

假設有兩個數據集R和S,咱們要找出R與S的差別。Mapper將全部的元組作上標記,代表他們來自於R仍是S,Reducer只輸出那些存在於R中而不在S中的記錄。

  
1 class Mapper 2 method Map(rowkey key, tuple t) 3 Emit(tuple t, string t.SetName) // t.SetName is either 'R' or 'S' 4 5 class Reducer 6 method Reduce(tuple t, array n) // array n can be ['R'], ['S'], ['R' 'S'], or ['S', 'R'] 7 if n.size() = 1 and n[ 1 ] = ' R ' 8 Emit(tuple t, null )

分組聚合(GroupBy and Aggregation)

分組聚合能夠在以下的一個MapReduce中完成。Mapper抽取數據並將之分組聚合,Reducer 中對收到的數據再次聚合。典型的聚合應用好比求和與最值能夠以流的方式進行計算,於是不須要同時保有全部的值。可是另一些情景就必需要兩階段MapReduce,前面提到過的唯一值模式就是一個這種類型的例子。

  
1 class Mapper 2 method Map( null , tuple [value GroupBy, value AggregateBy, value ...]) 3 Emit(value GroupBy, value AggregateBy) 4 5 class Reducer 6 method Reduce(value GroupBy, [v1, v2,...]) 7 Emit(value GroupBy, aggregate( [v1, v2,...] ) ) 8 // aggregate() : sum(), max(),...

鏈接(Joining)

MapperReduce框架能夠很好地處理鏈接,不過在面對不一樣的數據量和處理效率要求的時候仍是有一些技巧。在這部分咱們會介紹一些基本方法,在後面的參考文檔中還列出了一些關於這方面的專題文章。

分配後鏈接 (Reduce端鏈接,排序-合併鏈接)

這個算法按照鍵K來鏈接數據集R和L。Mapper 遍歷R和L中的全部元組,以K爲鍵輸出每個標記了來自於R仍是L的元組,Reducer把同一個K的數據分裝入兩個容器(R和L),而後嵌套循環遍歷兩個容器中的數據以獲得交集,最後輸出的每一條結果都包含了R中的數據、L中的數據和K。這種方法有如下缺點:

  • Mapper要輸出全部的數據,即便一些key只會在一個集合中出現。
  • Reducer 要在內存中保有一個key的全部數據,若是數據量打過了內存,那麼就要緩存到硬盤上,這就增長了硬盤IO的消耗。

儘管如此,再分配鏈接方式仍然是最通用的方法,特別是其餘優化技術都不適用的時候。

  
1 class Mapper 2 method Map( null , tuple [join_key k, value v1, value v2,...]) 3 Emit(join_key k, tagged_tuple [set_name tag, values [v1, v2, ...] ] ) 4 5 class Reducer 6 method Reduce(join_key k, tagged_tuples [t1, t2,...]) 7 H = new AssociativeArray : set_name -> values 8 for all tagged_tuple t in [t1, t2,...] // separate values into 2 arrays 9 H{t.tag}.add(t.values) 10 for all values r in H{ ' R ' } // produce a cross-join of the two arrays 11 for all values l in H{ ' L ' } 12 Emit( null , [k r l] )

複製連接Replicated Join (Mapper端鏈接, Hash 鏈接)

在實際應用中,將一個小數據集和一個大數據集鏈接是很常見的(如用戶與日誌記錄)。假定要鏈接兩個集合R和L,其中R相對較小,這樣,能夠把R分發給全部的Mapper,每一個Mapper均可以載入它並以鏈接鍵來索引其中的數據,最經常使用和有效的索引技術就是哈希表。以後,Mapper遍歷L,並將其與存儲在哈希表中的R中的相應記錄鏈接,。這種方法很是高效,由於不須要對L中的數據排序,也不須要經過網絡傳送L中的數據,可是R必須足夠小到可以分發給全部的Mapper。

  
1 class Mapper 2 method Initialize 3 H = new AssociativeArray : join_key -> tuple from R 4 R = loadR() 5 for all [ join_key k, tuple [r1, r2,...] ] in R 6 H{k} = H{k}.append( [r1, r2,...] ) 7 8 method Map(join_key k, tuple l) 9 for all tuple r in H{k} 10 Emit( null , tuple [k r l] )
參考資料:
  1. Join Algorithms using Map/Reduce
  2. Optimizing Joins in a MapReduce Environment

應用於機器學習和數學方面的 MapReduce 算法

相關文章
相關標籤/搜索