Google MapReduce中文版
譯者: alex
摘要
MapReduce 是一個編程模型,也是一個處理和生成超大數據集的算法模型的相關實現。用戶首先建立一個Map函數處理一個基於key/value pair的數據集合,輸出中間的基於key/value pair的數據集合;而後再建立一個Reduce函數用來合併全部的具備相同中間key值的中間value值。現實世界中有不少知足上述處理模型的例子, 本論文將詳細描述這個模型。
MapReduce架構的程序可以在大量的 普通配置的計算機上實現並行化處理。這個系統在運行時只關心:如何分割輸入數據,在大量計算機組成的集羣上的調度,集羣中計算機的錯誤處理,管理集羣中計 算機之間必要的通訊。採用MapReduce架構可使那些沒有並行計算和分佈式處理系統開發經驗的程序員有效利用分佈式系統的豐富資源。
我 們的MapReduce實現運行在規模能夠靈活調整的由普通機器組成的集羣上:一個典型的MapReduce計算每每由幾千臺機器組成、處理以TB計算的 數據。程序員發現這個系統很是好用:已經實現了數以百計的MapReduce程序,在Google的集羣上,天天都有1000多個MapReduce程序 在執行。
一、介紹
在 過去的5年裏,包括本文做者在內的Google的不少程序員,爲了處理海量的原始數據,已經實現了數以百計的、專用的計算方法。這些計算方法用來處理大量 的原始數據,好比,文檔抓取(相似網絡爬蟲的程序)、Web請求日誌等等;也爲了計算處理各類類型的衍生數據,好比倒排索引、Web文檔的圖結構的各類表 示形勢、每臺主機上網絡爬蟲抓取的頁面數量的彙總、天天被請求的最多的查詢的集合等等。大多數這樣的數據處理運算在概念上很容易理解。然而因爲輸入的數據 量巨大,所以要想在可接受的時間內完成運算,只有將這些計算分佈在成百上千的主機上。如何處理並行計算、如何分發數據、如何處理錯誤?全部這些問題綜合在 一塊兒,須要大量的代碼處理,所以也使得本來簡單的運算變得難以處理。
爲 瞭解決上述複雜的問題,咱們設計一個新的抽象模型,使用這個抽象模型,咱們只要表述咱們想要執行的簡單運算便可,而沒必要關心並行計算、容錯、數據分佈、負 載均衡等複雜的細節,這些問題都被封裝在了一個庫裏面。設計這個抽象模型的靈感來自Lisp和許多其餘函數式語言的Map和Reduce的原語。咱們意識 到咱們大多數的運算都包含這樣的操做:在輸入數據的「邏輯」記錄上應用Map操做得出一箇中間key/value pair集合,而後在全部具備相同key值的value值上應用Reduce操做,從而達到合併中間的數據,獲得一個想要的結果的目的。使用 MapReduce模型,再結合用戶實現的Map和Reduce函數,咱們就能夠很是容易的實現大規模並行化計算;經過MapReduce模型自帶的「再 次執行」(re-execution)功能,也提供了初級的容災實現方案。
這個工做(實現一個MapReduce框架模型)的主要貢獻是經過簡單的接口來實現自動的並行化和大規模的分佈式計算,經過使用MapReduce模型接口實如今大量普通的PC機上高性能計算。
第 二部分描述基本的編程模型和一些使用案例。第三部分描述了一個通過裁剪的、適合咱們的基於集羣的計算環境的MapReduce實現。第四部分描述咱們認爲 在MapReduce編程模型中一些實用的技巧。第五部分對於各類不一樣的任務,測量咱們MapReduce實現的性能。第六部分揭示了在Google內部 如何使用MapReduce做爲基礎重寫咱們的索引系統產品,包括其它一些使用MapReduce的經驗。第七部分討論相關的和將來的工做。
二、編程模型
MapReduce編程模型的原理是:利用一個輸入key/value pair集合來產生一個輸出的key/value pair集合。MapReduce庫的用戶用兩個函數表達這個計算:Map和Reduce。
用戶自定義的Map函數接受一個輸入的key/value pair值,而後產生一箇中間key/value pair值的集合。MapReduce庫把全部具備相同中間key值I的中間value值集合在一塊兒後傳遞給reduce函數。
用 戶自定義的Reduce函數接受一箇中間key的值I和相關的一個value值的集合。Reduce函數合併這些value值,造成一個較小的value 值的集合。通常的,每次Reduce函數調用只產生0或1個輸出value值。一般咱們經過一個迭代器把中間value值提供給Reduce函數,這樣我 們就能夠處理沒法所有放入內存中的大量的value值的集合。
2.一、例子
例如,計算一個大的文檔集合中每一個單詞出現的次數,下面是僞代碼段:
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, 「1″);
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
Map函數輸出文檔中的每一個詞、以及這個詞的出現次數(在這個簡單的例子裏就是1)。Reduce函數把Map函數產生的每個特定的詞的計數累加起來。
另 外,用戶編寫代碼,使用輸入和輸出文件的名字、可選的調節參數來完成一個符合MapReduce模型規範的對象,而後調用MapReduce函數,並把這 個規範對象傳遞給它。用戶的代碼和MapReduce庫連接在一塊兒(用C++實現)。附錄A包含了這個實例的所有程序代碼。
2.二、類型
儘管在前面例子的僞代碼中使用了以字符串表示的輸入輸出值,可是在概念上,用戶定義的Map和Reduce函數都有相關聯的類型:
map(k1,v1) ->list(k2,v2)
reduce(k2,list(v2)) ->list(v2)
好比,輸入的key和value值與輸出的key和value值在類型上推導的域不一樣。此外,中間key和value值與輸出key和value值在類型上推導的域相同。
(alex注:原文中這個domain的含義不是很清楚,我參考Hadoop、KFS等實現,map和reduce都使用了泛型,所以,我把domain翻譯成類型推導的域)。
咱們的C++中使用字符串類型做爲用戶自定義函數的輸入輸出,用戶在本身的代碼中對字符串進行適當的類型轉換。html
2.三、更多的例子
這裏還有一些有趣的簡單例子,能夠很容易的使用MapReduce模型來表示:
-
分佈式的Grep:Map函數輸出匹配某個模式的一行,Reduce函數是一個恆等函數,即把中間數據複製到輸出。
-
計算URL訪問頻率:Map函數處理日誌中web頁面請求的記錄,而後輸出(URL,1)。Reduce函數把相同URL的value值都累加起來,產生(URL,記錄總數)結果。
-
倒轉網絡連接圖:Map函數在源頁面(source)中搜索全部的連接目標(target)並輸出爲(target,source)。Reduce函數把給定連接目標(target)的連接組合成一個列表,輸出(target,list(source))。
-
每 個主機的檢索詞向量:檢索詞向量用一個(詞,頻率)列表來概述出如今文檔或文檔集中的最重要的一些詞。Map函數爲每個輸入文檔輸出(主機名,檢索詞向 量),其中主機名來自文檔的URL。Reduce函數接收給定主機的全部文檔的檢索詞向量,並把這些檢索詞向量加在一塊兒,丟棄掉低頻的檢索詞,輸出一個最 終的(主機名,檢索詞向量)。
-
倒排索引:Map函數分析每一個文檔輸出一個(詞,文檔號)的列表,Reduce函數的輸入是一個給定詞的全部(詞,文檔號),排序全部的文檔號,輸出(詞,list(文檔號))。全部的輸出集合造成一個簡單的倒排索引,它以一種簡單的算法跟蹤詞在文檔中的位置。
-
分佈式排序:Map函數從每一個記錄提取key,輸出(key,record)。Reduce函數不改變任何的值。這個運算依賴分區機制(在4.1描述)和排序屬性(在4.2描述)。
三、實現
MapReduce模型能夠有多種不一樣的實現方式。如何正確選擇取決於具體的環境。例如,一種實現方式適用於小型的共享內存方式的機器,另一種實現方式則適用於大型NUMA架構的多處理器的主機,而有的實現方式更適合大型的網絡鏈接集羣。
本章節描述一個適用於Google內部普遍使用的運算環境的實現:用以太網交換機鏈接、由普通PC機組成的大型集羣。在咱們的環境裏包括:
1.x86架構、運行Linux操做系統、雙處理器、2-4GB內存的機器。
2.普通的網絡硬件設備,每一個機器的帶寬爲百兆或者千兆,可是遠小於網絡的平均帶寬的一半。
(alex注:這裏須要網絡專家解釋一下了)
3.集羣中包含成百上千的機器,所以,機器故障是常態。
4.存儲爲廉價的內置IDE硬盤。一個內部分佈式文件系統用來管理存儲在這些磁盤上的數據。文件系統經過數據複製來在不可靠的硬件上保證數據的可靠性和有效性。
5.用戶提交工做(job)給調度系統。每一個工做(job)都包含一系列的任務(task),調度系統將這些任務調度到集羣中多臺可用的機器上。
3.一、執行歸納
通 過將Map調用的輸入數據自動分割爲M個數據片斷的集合,Map調用被分佈到多臺機器上執行。輸入的數據片斷可以在不一樣的機器上並行處理。使用分區函數將 Map調用產生的中間key值分紅R個不一樣分區(例如,hash(key) mod R),Reduce調用也被分佈到多臺機器上執行。分區數量(R)和分區函數由用戶來指定。
圖1展現了咱們的MapReduce實現中操做的所有流程。當用戶調用MapReduce函數時,將發生下面的一系列動做(下面的序號和圖1中的序號一一對應):
1.用戶程序首先調用的MapReduce庫將輸入文件分紅M個數據片度,每一個數據片斷的大小通常從 16MB到64MB(能夠經過可選的參數來控制每一個數據片斷的大小)。而後用戶程序在機羣中建立大量的程序副本。
(alex:copies of the program還真難翻譯)
2.這些程序副本中的有一個特殊的程序–master。副本中其它的程序都是worker程序,由master分配任務。有M個Map任務和R個Reduce任務將被分配,master將一個Map任務或Reduce任務分配給一個空閒的worker。
3.被分配了map任務的worker程序讀取相關的輸入數據片斷,從輸入的數據片斷中解析出key/value pair,而後把key/value pair傳遞給用戶自定義的Map函數,由Map函數生成並輸出的中間key/value pair,並緩存在內存中。
4.緩存中的key/value pair經過分區函數分紅R個區域,以後週期性的寫入到本地磁盤上。緩存的key/value pair在本地磁盤上的存儲位置將被回傳給master,由master負責把這些存儲位置再傳送給Reduce worker。
5.當Reduce worker程序接收到master程序發來的數據存儲位置信息後,使用RPC從Map worker所在主機的磁盤上讀取這些緩存數據。當Reduce worker讀取了全部的中間數據後,經過對key進行排序後使得具備相同key值的數據聚合在一塊兒。因爲許多不一樣的key值會映射到相同的Reduce 任務上,所以必須進行排序。若是中間數據太大沒法在內存中完成排序,那麼就要在外部進行排序。
6.Reduce worker程序遍歷排序後的中間數據,對於每個惟一的中間key值,Reduce worker程序將這個key值和它相關的中間value值的集合傳遞給用戶自定義的Reduce函數。Reduce函數的輸出被追加到所屬分區的輸出文件。
7.當全部的Map和Reduce任務都完成以後,master喚醒用戶程序。在這個時候,在用戶程序裏的對MapReduce調用才返回。
在 成功完成任務以後,MapReduce的輸出存放在R個輸出文件中(對應每一個Reduce任務產生一個輸出文件,文件名由用戶指定)。通常狀況下,用戶不 須要將這R個輸出文件合併成一個文件–他們常常把這些文件做爲另一個MapReduce的輸入,或者在另一個能夠處理多個分割文件的分佈式應用中使 用。程序員
3.二、Master數據結構
Master持有一些數據結構,它存儲每個Map和Reduce任務的狀態(空閒、工做中或完成),以及Worker機器(非空閒任務的機器)的標識。
Master 就像一個數據管道,中間文件存儲區域的位置信息經過這個管道從Map傳遞到Reduce。所以,對於每一個已經完成的Map任務,master存儲了Map 任務產生的R箇中間文件存儲區域的大小和位置。當Map任務完成時,Master接收到位置和大小的更新信息,這些信息被逐步遞增的推送給那些正在工做的 Reduce任務。
3.三、容錯
由於MapReduce庫的設計初衷是使用由成百上千的機器組成的集羣來處理超大規模的數據,因此,這個庫必需要能很好的處理機器故障。
worker故障
master 週期性的ping每一個worker。若是在一個約定的時間範圍內沒有收到worker返回的信息,master將把這個worker標記爲失效。全部由這 個失效的worker完成的Map任務被重設爲初始的空閒狀態,以後這些任務就能夠被安排給其餘的worker。一樣的,worker失效時正在運行的 Map或Reduce任務也將被從新置爲空閒狀態,等待從新調度。
當worker故障時,因爲已經完成的Map任務的輸出存儲在這臺機器上,Map任務的輸出已不可訪問了,所以必須從新執行。而已經完成的Reduce任務的輸出存儲在全局文件系統上,所以不須要再次執行。web
當 一個Map任務首先被worker A執行,以後因爲worker A失效了又被調度到worker B執行,這個「從新執行」的動做會被通知給全部執行Reduce任務的worker。任何尚未從worker A讀取數據的Reduce任務將從worker B讀取數據。
MapReduce 能夠處理大規模worker失效的狀況。好比,在一個MapReduce操做執行期間,在正在運行的集羣上進行網絡維護引發80臺機器在幾分鐘內不可訪問 了,MapReduce master只須要簡單的再次執行那些不可訪問的worker完成的工做,以後繼續執行未完成的任務,直到最終完成這個MapReduce操做。
master失敗
一個簡單的解決辦法是讓master週期性的將上面描述的數據結構
(alex注:指3.2節)的 寫入磁盤,即檢查點(checkpoint)。若是這個master任務失效了,能夠從最後一個檢查點(checkpoint)開始啓動另外一個 master進程。然而,因爲只有一個master進程,master失效後再恢復是比較麻煩的,所以咱們如今的實現是若是master失效,就停止 MapReduce運算。客戶能夠檢查到這個狀態,而且能夠根據須要從新執行MapReduce操做。
在失效方面的處理機制
(alex注:原文爲」semantics in the presence of failures」)
當用戶提供的Map和Reduce操做是輸入肯定性函數(即相同的輸入產生相同的輸出)時,咱們的分佈式實如今任何狀況下的輸出都和全部程序沒有出現任何錯誤、順序的執行產生的輸出是同樣的。
我 們依賴對Map和Reduce任務的輸出是原子提交的來完成這個特性。每一個工做中的任務把它的輸出寫到私有的臨時文件中。每一個Reduce任務生成一個這 樣的文件,而每一個Map任務則生成R個這樣的文件(一個Reduce任務對應一個文件)。當一個Map任務完成的時,worker發送一個包含R個臨時文 件名的完成消息給master。若是master從一個已經完成的Map任務再次接收到到一個完成消息,master將忽略這個消息;不然,master 將這R個文件的名字記錄在數據結構裏。
當Reduce任務完成 時,Reduce worker進程以原子的方式把臨時文件重命名爲最終的輸出文件。若是同一個Reduce任務在多臺機器上執行,針對同一個最終的輸出文件將有多個重命名 操做執行。咱們依賴底層文件系統提供的重命名操做的原子性來保證最終的文件系統狀態僅僅包含一個Reduce任務產生的數據。
使 用MapReduce模型的程序員能夠很容易的理解他們程序的行爲,由於咱們絕大多數的Map和Reduce操做是肯定性的,並且存在這樣的一個事實:我 們的失效處理機制等價於一個順序的執行的操做。當Map或/和Reduce操做是不肯定性的時候,咱們提供雖然較弱可是依然合理的處理機制。當使用非肯定 操做的時候,一個Reduce任務R1的輸出等價於一個非肯定性程序順序執行產生時的輸出。可是,另外一個Reduce任務R2的輸出也許符合一個不一樣的非 肯定順序程序執行產生的R2的輸出。算法
考慮Map任務M和Reduce任務R一、R2的狀況。咱們設定e(Ri)是Ri已經提交的執行過程(有且僅有一個這樣的執行過程)。當e(R1)讀取了由M一次執行產生的輸出,而e(R2)讀取了由M的另外一次執行產生的輸出,致使了較弱的失效處理。
3.四、存儲位置
在 咱們的計算運行環境中,網絡帶寬是一個至關匱乏的資源。咱們經過儘可能把輸入數據(由GFS管理)存儲在集羣中機器的本地磁盤上來節省網絡帶寬。GFS把每 個文件按64MB一個Block分隔,每一個Block保存在多臺機器上,環境中就存放了多份拷貝(通常是3個拷貝)。MapReduce的master在 調度Map任務時會考慮輸入文件的位置信息,儘可能將一個Map任務調度在包含相關輸入數據拷貝的機器上執行;若是上述努力失敗了,master將嘗試在保 存有輸入數據拷貝的機器附近的機器上執行Map任務(例如,分配到一個和包含輸入數據的機器在一個switch裏的worker機器上執行)。當在一個足 夠大的cluster集羣上運行大型MapReduce操做的時候,大部分的輸入數據都能從本地機器讀取,所以消耗很是少的網絡帶寬。
3.五、任務粒度
如 前所述,咱們把Map拆分紅了M個片斷、把Reduce拆分紅R個片斷執行。理想狀況下,M和R應當比集羣中worker的機器數量要多得多。在每臺 worker機器都執行大量的不一樣任務可以提升集羣的動態的負載均衡能力,而且可以加快故障恢復的速度:失效機器上執行的大量Map任務均可以分佈到全部 其餘的worker機器上去執行。
可是實際上,在咱們的具體實現中對M和R的取值都有必定的客觀限制,由於master必須執行O(M+R)次調度,而且在內存中保存O(M*R)個狀態(對影響內存使用的因素仍是比較小的:O(M*R)塊狀態,大概每對Map任務/Reduce任務1個字節就能夠了)。數據庫
更 進一步,R值一般是由用戶指定的,由於每一個Reduce任務最終都會生成一個獨立的輸出文件。實際使用時咱們也傾向於選擇合適的M值,以使得每個獨立任 務都是處理大約16M到64M的輸入數據(這樣,上面描寫的輸入數據本地存儲優化策略才最有效),另外,咱們把R值設置爲咱們想使用的worker機器數 量的小的倍數。咱們一般會用這樣的比例來執行MapReduce:M=200000,R=5000,使用2000臺worker機器。
3.六、備用任務
影 響一個MapReduce的總執行時間最一般的因素是「落伍者」:在運算過程當中,若是有一臺機器花了很長的時間才完成最後幾個Map或Reduce任務, 致使MapReduce操做總的執行時間超過預期。出現「落伍者」的緣由很是多。好比:若是一個機器的硬盤出了問題,在讀取的時候要常常的進行讀取糾錯操 做,致使讀取數據的速度從30M/s下降到1M/s。若是cluster的調度系統在這臺機器上又調度了其餘的任務,因爲CPU、內存、本地硬盤和網絡帶 寬等競爭因素的存在,致使執行MapReduce代碼的執行效率更加緩慢。咱們最近遇到的一個問題是因爲機器的初始化代碼有bug,致使關閉了的處理器的 緩存:在這些機器上執行任務的性能和正常狀況相差上百倍。
咱們有一個通 用的機制來減小「落伍者」出現的狀況。當一個MapReduce操做接近完成的時候,master調度備用(backup)任務進程來執行剩下的、處於處 理中狀態(in-progress)的任務。不管是最初的執行進程、仍是備用(backup)任務進程完成了任務,咱們都把這個任務標記成爲已經完成。我 們調優了這個機制,一般只會佔用比正常操做多幾個百分點的計算資源。咱們發現採用這樣的機制對於減小超大MapReduce操做的總處理時間效果顯著。例 如,在5.3節描述的排序任務,在關閉掉備用任務的狀況下要多花44%的時間完成排序任務。
四、技巧
雖然簡單的Map和Reduce函數提供的基本功能已經可以知足大部分的計算須要,咱們仍是發掘出了一些有價值的擴展功能。本節將描述這些擴展功能。編程
4.一、分區函數
MapReduce 的使用者一般會指定Reduce任務和Reduce任務輸出文件的數量(R)。咱們在中間key上使用分區函數來對數據進行分區,以後再輸入到後續任務執 行進程。一個缺省的分區函數是使用hash方法(好比,hash(key) mod R)進行分區。hash方法能產生很是平衡的分區。然而,有的時候,其它的一些分區函數對key值進行的分區將很是有用。好比,輸出的key值是 URLs,咱們但願每一個主機的全部條目保持在同一個輸出文件中。爲了支持相似的狀況,MapReduce庫的用戶須要提供專門的分區函數。例如,使用 「hash(Hostname(urlkey)) mod R」做爲分區函數就能夠把全部來自同一個主機的URLs保存在同一個輸出文件中。
4.二、順序保證
咱們確保在給定的分區中,中間key/value pair數據的處理順序是按照key值增量順序處理的。這樣的順序保證對每一個分紅生成一個有序的輸出文件,這對於須要對輸出文件按key值隨機存取的應用很是有意義,對在排序輸出的數據集也頗有幫助。
4.三、Combiner函數
在 某些狀況下,Map函數產生的中間key值的重複數據會佔很大的比重,而且,用戶自定義的Reduce函數知足結合律和交換律。在2.1節的詞數統計程序 是個很好的例子。因爲詞頻率傾向於一個zipf分佈(齊夫分佈),每一個Map任務將產生成千上萬個這樣的記錄<the,1>。全部的這些記錄 將經過網絡被髮送到一個單獨的Reduce任務,而後由這個Reduce任務把全部這些記錄累加起來產生一個數字。咱們容許用戶指定一個可選的 combiner函數,combiner函數首先在本地將這些記錄進行一次合併,而後將合併的結果再經過網絡發送出去。
Combiner 函數在每臺執行Map任務的機器上都會被執行一次。通常狀況下,Combiner和Reduce函數是同樣的。Combiner函數和Reduce函數之 間惟一的區別是MapReduce庫怎樣控制函數的輸出。Reduce函數的輸出被保存在最終的輸出文件裏,而Combiner函數的輸出被寫到中間文件 裏,而後被髮送給Reduce任務。
部分的合併中間結果能夠顯著的提升一些MapReduce操做的速度。附錄A包含一個使用combiner函數的例子。api
4.四、輸入和輸出的類型
MapReduce 庫支持幾種不一樣的格式的輸入數據。好比,文本模式的輸入數據的每一行被視爲是一個key/value pair。key是文件的偏移量,value是那一行的內容。另一種常見的格式是以key進行排序來存儲的key/value pair的序列。每種輸入類型的實現都必須可以把輸入數據分割成數據片斷,該數據片斷可以由單獨的Map任務來進行後續處理(例如,文本模式的範圍分割必 須確保僅僅在每行的邊界進行範圍分割)。雖然大多數MapReduce的使用者僅僅使用不多的預約義輸入類型就知足要求了,可是使用者依然能夠經過提供一 個簡單的Reader接口實現就可以支持一個新的輸入類型。
Reader並不是必定要從文件中讀取數據,好比,咱們能夠很容易的實現一個從數據庫裏讀記錄的Reader,或者從內存中的數據結構讀取數據的Reader。數組
相似的,咱們提供了一些預約義的輸出數據的類型,經過這些預約義類型可以產生不一樣格式的數據。用戶採用相似添加新的輸入數據類型的方式增長新的輸出類型。緩存
4.五、反作用
在某些狀況下,MapReduce的使用者發現,若是在Map和/或Reduce操做過程當中增長輔助的輸出文件會比較省事。咱們依靠程序writer把這種「反作用」變成原子的和冪等的
(alex注:冪等的指一個老是產生相同結果的數學運算)。一般應用程序首先把輸出結果寫到一個臨時文件中,在輸出所有數據以後,在使用系統級的原子操做rename從新命名這個臨時文件。
若是一個任務產生了多個輸出文件,咱們沒有提供相似兩階段提交的原子操做支持這種狀況。所以,對於會產生多個輸出文件、而且對於跨文件有一致性要求的任務,都必須是肯定性的任務。可是在實際應用過程當中,這個限制尚未給咱們帶來過麻煩。服務器
4.六、跳過損壞的記錄
有 時候,用戶程序中的bug致使Map或者Reduce函數在處理某些記錄的時候crash掉,MapReduce操做沒法順利完成。慣常的作法是修復 bug後再次執行MapReduce操做,可是,有時候找出這些bug並修復它們不是一件容易的事情;這些bug也許是在第三方庫裏邊,而咱們手頭沒有這 些庫的源代碼。並且在不少時候,忽略一些有問題的記錄也是能夠接受的,好比在一個巨大的數據集上進行統計分析的時候。咱們提供了一種執行模式,在這種模式 下,爲了保證保證整個處理能繼續進行,MapReduce會檢測哪些記錄致使肯定性的crash,而且跳過這些記錄不處理。
每 個worker進程都設置了信號處理函數捕獲內存段異常(segmentation violation)和總線錯誤(bus error)。在執行Map或者Reduce操做以前,MapReduce庫經過全局變量保存記錄序號。若是用戶程序觸發了一個系統信號,消息處理函數將 用「最後一口氣」經過UDP包向master發送處理的最後一條記錄的序號。當master看到在處理某條特定記錄不止失敗一次時,master就標誌着 條記錄須要被跳過,而且在下次從新執行相關的Map或者Reduce任務的時候跳過這條記錄。
4.七、本地執行
調 試Map和Reduce函數的bug是很是困難的,由於實際執行操做時不可是分佈在系統中執行的,並且一般是在好幾千臺計算機上執行,具體的執行位置是由 master進行動態調度的,這又大大增長了調試的難度。爲了簡化調試、profile和小規模測試,咱們開發了一套MapReduce庫的本地實現版 本,經過使用本地版本的MapReduce庫,MapReduce操做在本地計算機上順序的執行。用戶能夠控制MapReduce操做的執行,能夠把操做 限制到特定的Map任務上。用戶經過設定特別的標誌來在本地執行他們的程序,以後就能夠很容易的使用本地調試和測試工具(好比gdb)。
4.八、狀態信息
master 使用嵌入式的HTTP服務器(如Jetty)顯示一組狀態信息頁面,用戶能夠監控各類執行狀態。狀態信息頁面顯示了包括計算執行的進度,好比已經完成了多 少任務、有多少任務正在處理、輸入的字節數、中間數據的字節數、輸出的字節數、處理百分比等等。頁面還包含了指向每一個任務的stderr和stdout文 件的連接。用戶根據這些數據預測計算須要執行大約多長時間、是否須要增長額外的計算資源。這些頁面也能夠用來分析何時計算執行的比預期的要慢。
另外,處於最頂層的狀態頁面顯示了哪些worker失效了,以及他們失效的時候正在運行的Map和Reduce任務。這些信息對於調試用戶代碼中的bug頗有幫助。
4.九、計數器
MapReduce庫使用計數器統計不一樣事件發生次數。好比,用戶可能想統計已經處理了多少個單詞、已經索引的多少篇German文檔等等。
爲了使用這個特性,用戶在程序中建立一個命名的計數器對象,在Map和Reduce函數中相應的增長計數器的值。例如:
Counter* uppercase;
uppercase = GetCounter(「uppercase」);
map(String name, String contents):
for each word w in contents:
if (IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w, 「1″);
這些計數器的值週期性的從各個單獨的worker機器上傳遞給master(附加在ping的應答包中傳遞)。master把執行成功的Map和Reduce任務的計數器值進行累計,當MapReduce操做完成以後,返回給用戶代碼。
計數器當前的值也會顯示在master的狀態頁面上,這樣用戶就能夠看到當前計算的進度。當累加計數器的值的時候,master要檢查重複運行的Map或者Reduce任務,避免重複累加(以前提到的備用任務和失效後從新執行任務這兩種狀況會致使相同的任務被屢次執行)。
有些計數器的值是由MapReduce庫自動維持的,好比已經處理的輸入的key/value pair的數量、輸出的key/value pair的數量等等。
計數器機制對於MapReduce操做的完整性檢查很是有用。好比,在某些MapReduce操做中,用戶須要確保輸出的key value pair精確的等於輸入的key value pair,或者處理的German文檔數量在處理的整個文檔數量中屬於合理範圍。
五、性能
本節咱們用在一個大型集羣上運行的兩個計算來衡量MapReduce的性能。一個計算在大約1TB的數據中進行特定的模式匹配,另外一個計算對大約1TB的數據進行排序。
這兩個程序在大量的使用MapReduce的實際應用中是很是典型的 — 一類是對數據格式進行轉換,從一種表現形式轉換爲另一種表現形式;另外一類是從海量數據中抽取少部分的用戶感興趣的數據。
5.一、集羣配置
所 有這些程序都運行在一個大約由1800臺機器構成的集羣上。每臺機器配置2個2G主頻、支持超線程的Intel Xeon處理器,4GB的物理內存,兩個160GB的IDE硬盤和一個千兆以太網卡。這些機器部署在一個兩層的樹形交換網絡中,在root節點大概有 100-200GBPS的傳輸帶寬。全部這些機器都採用相同的部署(對等部署),所以任意兩點之間的網絡來回時間小於1毫秒。
在4GB內存裏,大概有1-1.5G用於運行在集羣上的其餘任務。測試程序在週末下午開始執行,這時主機的CPU、磁盤和網絡基本上處於空閒狀態。
5.二、GREP
這個分佈式的grep程序須要掃描大概10的10次方個由100個字節組成的記錄,查找出現機率較小的3個字符的模式(這個模式在92337個記錄中出現)。輸入數據被拆分紅大約64M的Block(M=15000),整個輸出數據存放在一個文件中(R=1)。
圖 2顯示了這個運算隨時間的處理過程。其中Y軸表示輸入數據的處理速度。處理速度隨着參與MapReduce計算的機器數量的增長而增長,當1764臺 worker參與計算的時,處理速度達到了30GB/s。當Map任務結束的時候,即在計算開始後80秒,輸入的處理速度降到0。整個計算過程從開始到結 束一共花了大概150秒。這包括了大約一分鐘的初始啓動階段。初始啓動階段消耗的時間包括了是把這個程序傳送到各個worker機器上的時間、等待GFS 文件系統打開1000個輸入文件集合的時間、獲取相關的文件本地位置優化信息的時間。
5.三、排序
排序程序處理10的10次方個100個字節組成的記錄(大概1TB的數據)。這個程序模仿TeraSort benchmark[10]。
排 序程序由不到50行代碼組成。只有三行的Map函數從文本行中解析出10個字節的key值做爲排序的key,而且把這個key和原始文本行做爲中間的 key/value pair值輸出。咱們使用了一個內置的恆等函數做爲Reduce操做函數。這個函數把中間的key/value pair值不做任何改變輸出。最終排序結果輸出到兩路複製的GFS文件系統(也就是說,程序輸出2TB的數據)。
如前所述,輸入數據被分紅64MB的Block(M=15000)。咱們把排序後的輸出結果分區後存儲到4000個文件(R=4000)。分區函數使用key的原始字節來把數據分區到R個片斷中。
在這個benchmark測試中,咱們使用的分區函數知道key的分區狀況。一般對於排序程序來講,咱們會增長一個預處理的MapReduce操做用於採樣key值的分佈狀況,經過採樣的數據來計算對最終排序處理的分區點。
圖 三(a)顯示了這個排序程序的正常執行過程。左上的圖顯示了輸入數據讀取的速度。數據讀取速度峯值會達到13GB/s,而且全部Map任務完成以後,即大 約200秒以後迅速滑落到0。值得注意的是,排序程序輸入數據讀取速度小於分佈式grep程序。這是由於排序程序的Map任務花了大約一半的處理時間和 I/O帶寬把中間輸出結果寫到本地硬盤。相應的分佈式grep程序的中間結果輸出幾乎能夠忽略不計。
左 邊中間的圖顯示了中間數據從Map任務發送到Reduce任務的網絡速度。這個過程從第一個Map任務完成以後就開始緩慢啓動了。圖示的第一個高峯是啓動 了第一批大概1700個Reduce任務(整個MapReduce分佈到大概1700臺機器上,每臺機器1次最多執行1個Reduce任務)。排序程序運 行大約300秒後,第一批啓動的Reduce任務有些完成了,咱們開始執行剩下的Reduce任務。全部的處理在大約600秒後結束。
左 下圖表示Reduce任務把排序後的數據寫到最終的輸出文件的速度。在第一個排序階段結束和數據開始寫入磁盤之間有一個小的延時,這是由於worker機 器正在忙於排序中間數據。磁盤寫入速度在2-4GB/s持續一段時間。輸出數據寫入磁盤大約持續850秒。計入初始啓動部分的時間,整個運算消耗了891 秒。這個速度和TeraSort benchmark[18]的最高紀錄1057秒相差很少。
還 有一些值得注意的現象:輸入數據的讀取速度比排序速度和輸出數據寫入磁盤速度要高很多,這是由於咱們的輸入數據本地化優化策略起了做用 — 絕大部分數據都是從本地硬盤讀取的,從而節省了網絡帶寬。排序速度比輸出數據寫入到磁盤的速度快,這是由於輸出數據寫了兩份(咱們使用了2路的GFS文件 系統,寫入複製節點的緣由是爲了保證數據可靠性和可用性)。咱們把輸出數據寫入到兩個複製節點的緣由是由於這是底層文件系統的保證數據可靠性和可用性的實 現機制。若是底層文件系統使用相似容錯編碼[14](erasure coding)的方式而不是複製的方式保證數據的可靠性和可用性,那麼在輸出數據寫入磁盤的時候,就能夠下降網絡帶寬的使用。
5.四、高效的backup任務
圖 三(b)顯示了關閉了備用任務後排序程序執行狀況。執行的過程和圖3(a)很類似,除了輸出數據寫磁盤的動做在時間上拖了一個很長的尾巴,並且在這段時間 裏,幾乎沒有什麼寫入動做。在960秒後,只有5個Reduce任務沒有完成。這些拖後腿的任務又執行了300秒才完成。整個計算消耗了1283秒,多了 44%的執行時間。
5.五、失效的機器
在圖三(c)中演示的排序程序執行的過程當中,咱們在程序開始後幾分鐘有意的kill了1746個worker中的200個。集羣底層的調度馬上在這些機器上從新開始新的worker處理進程(由於只是worker機器上的處理進程被kill了,機器自己還在工做)。
圖 三(c)顯示出了一個「負」的輸入數據讀取速度,這是由於一些已經完成的Map任務丟失了(因爲相應的執行Map任務的worker進程被kill了), 須要從新執行這些任務。相關Map任務很快就被從新執行了。整個運算在933秒內完成,包括了初始啓動時間(只比正常執行多消耗了5%的時間)。
六、經驗
我 們在2003年1月完成了第一個版本的MapReduce庫,在2003年8月的版本有了顯著的加強,這包括了輸入數據本地優化、worker機器之間的 動態負載均衡等等。從那之後,咱們驚喜的發現,MapReduce庫能普遍應用於咱們平常工做中遇到的各種問題。它如今在Google內部各個領域獲得廣 泛應用,包括:
-
大規模機器學習問題
-
Google News和Froogle產品的集羣問題
-
從公衆查詢產品(好比Google的Zeitgeist)的報告中抽取數據。
-
從大量的新應用和新產品的網頁中提取有用信息(好比,從大量的位置搜索網頁中抽取地理位置信息)。
-
大規模的圖形計算。
圖 四顯示了在咱們的源代碼管理系統中,隨着時間推移,獨立的MapReduce程序數量的顯著增長。從2003年早些時候的0個增加到2004年9月份的差 很少900個不一樣的程序。MapReduce的成功取決於採用MapReduce庫可以在不到半個小時時間內寫出一個簡單的程序,這個簡單的程序可以在上 千臺機器的組成的集羣上作大規模併發處理,這極大的加快了開發和原形設計的週期。另外,採用MapReduce庫,可讓徹底沒有分佈式和/或並行系統開 發經驗的程序員很容易的利用大量的資源,開發出分佈式和/或並行處理的應用。
在每一個任務結束的時候,MapReduce庫統計計算資源的使用情況。在表1,咱們列出了2004年8月份MapReduce運行的任務所佔用的相關資源。
6.一、大規模索引
到目前爲止,MapReduce最成功的應用就是重寫了Google網絡搜索服務所使用到的index系統。索引系統的輸入數據是網絡爬蟲抓取回來的海量的文檔,這些文檔數據都保存在GFS文件系統裏。這些文檔原始內容
(alex注:raw contents,我認爲就是網頁中的剔除html標記後的內容、pdf和word等有格式文檔中提取的文本內容等)的大小超過了20TB。索引程序是經過一系列的MapReduce操做(大約5到10次)來創建索引。使用MapReduce(替換上一個特別設計的、分佈式處理的索引程序)帶來這些好處:
-
實現索引部分的代碼簡單、小巧、容易理解,由於對於容錯、分佈式以及並行計算的處理都是MapReduce庫提供的。好比,使用MapReduce庫,計算的代碼行數從原來的3800行C++代碼減小到大概700行代碼。
-
MapReduce 庫的性能已經足夠好了,所以咱們能夠把在概念上不相關的計算步驟分開處理,而不是混在一塊兒以期減小數據傳遞的額外消耗。概念上不相關的計算步驟的隔離也使 得咱們能夠很容易改變索引處理方式。好比,對以前的索引系統的一個小更改可能要耗費好幾個月的時間,可是在使用MapReduce的新系統上,這樣的更改 只須要花幾天時間就能夠了。
-
索引系統的操做管理更容易了。由於由機器失效、機器處理速度緩慢、以及網絡的瞬間阻塞等引發的絕大部分問題都已經由MapReduce庫解決了,再也不須要操做人員的介入了。另外,咱們能夠經過在索引系統集羣中增長機器的簡單方法提升總體處理性能。
七、相關工做
不少系統都提供了嚴格的編程模式,而且經過對編程的嚴格限制來實現並行計算。例如,一個結合函數能夠經過把N個元素的數組的前綴在N個處理器上使用並行前綴算法,在log N的時間內計算完[6,9,13]
(alex注:徹底沒有明白做者在說啥,具體參考相關六、九、13文檔)。MapReduce能夠看做是咱們結合在真實環境下處理海量數據的經驗,對這些經典模型進行簡化和萃取的成果。更加值得驕傲的是,咱們還實現了基於上千臺處理器的集羣的容錯處理。相比而言,大部分併發處理系統都只在小規模的集羣上實現,而且把容錯處理交給了程序員。
Bulk Synchronous Programming[17]和一些MPI原語[11]提供了更高級別的並行處理抽象,能夠更容易寫出並行處理的程序。MapReduce和這些系統的 關鍵不一樣之處在於,MapReduce利用限制性編程模式實現了用戶程序的自動併發處理,而且提供了透明的容錯處理。
咱們數據本地優化策略的靈感來源於active disks[12,15]等技術,在active disks中,計算任務是儘可能推送到數據存儲的節點處理
(alex注:即靠近數據源處理),這樣就減小了網絡和IO子系統的吞吐量。咱們在掛載幾個硬盤的普通機器上執行咱們的運算,而不是在磁盤處理器上執行咱們的工做,可是達到的目的同樣的。
咱們的備用任務機制和Charlotte System[3]提出的eager調度機制比較相似。Eager調度機制的一個缺點是若是一個任務反覆失效,那麼整個計算就不能完成。咱們經過忽略引發故障的記錄的方式在某種程度上解決了這個問題。
MapReduce的實現依賴於一個內部的集羣管理系統,這個集羣管理系統負責在一個超大的、共享機器的集羣上分佈和運行用戶任務。雖然這個不是本論文的重點,可是有必要提一下,這個集羣管理系統在理念上和其它系統,如Condor[16]是同樣。
MapReduce 庫的排序機制和NOW-Sort[1]的操做上很相似。讀取輸入源的機器(map workers)把待排序的數據進行分區後,發送到R個Reduce worker中的一個進行處理。每一個Reduce worker在本地對數據進行排序(儘量在內存中排序)。固然,NOW-Sort沒有給用戶自定義的Map和Reduce函數的機會,所以不具有 MapReduce庫普遍的實用性。
River[2]提供了一個編程模 型:處理進程經過分佈式隊列傳送數據的方式進行互相通信。和MapReduce相似,River系統嘗試在不對等的硬件環境下,或者在系統顛簸的狀況下也 能提供近似平均的性能。River是經過精心調度硬盤和網絡的通信來平衡任務的完成時間。MapReduce庫採用了其它的方法。經過對編程模型進行限 制,MapReduce框架把問題分解成爲大量的「小」任務。這些任務在可用的worker集羣上動態的調度,這樣快速的worker就能夠執行更多的任 務。經過對編程模型進行限制,咱們可用在工做接近完成的時候調度備用任務,縮短在硬件配置不均衡的狀況下縮小整個操做完成的時間(好比有的機器性能差、或 者機器被某些操做阻塞了)。
BAD-FS[5]採用了和MapReduce徹底不一樣的編程模式,它是面向廣域網
(alex注:wide-area network)的。不過,這兩個系統有兩個基礎功能很相似。(1)兩個系統採用從新執行的方式來防止因爲失效致使的數據丟失。(2)兩個都使用數據本地化調度策略,減小網絡通信的數據量。
TACC[7]是一個用於簡化構造高可用性網絡服務的系統。和MapReduce同樣,它也依靠從新執行機制來實現的容錯處理。
八、結束語
MapReduce 編程模型在Google內部成功應用於多個領域。咱們把這種成功歸結爲幾個方面:首先,因爲MapReduce封裝了並行處理、容錯處理、數據本地化優 化、負載均衡等等技術難點的細節,這使得MapReduce庫易於使用。即使對於徹底沒有並行或者分佈式系統開發經驗的程序員而言;其次,大量不一樣類型的 問題均可以經過MapReduce簡單的解決。好比,MapReduce用於生成Google的網絡搜索服務所須要的數據、用來排序、用來數據挖掘、用於 機器學習,以及不少其它的系統;第三,咱們實現了一個在數千臺計算機組成的大型集羣上靈活部署運行的MapReduce。這個實現使得有效利用這些豐富的 計算資源變得很是簡單,所以也適合用來解決Google遇到的其餘不少須要大量計算的問題。
我 們也從MapReduce開發過程當中學到了很多東西。首先,約束編程模式使得並行和分佈式計算很是容易,也易於構造容錯的計算環境;其次,網絡帶寬是稀有 資源。大量的系統優化是針對減小網絡傳輸量爲目的的:本地優化策略使大量的數據從本地磁盤讀取,中間文件寫入本地磁盤、而且只寫一份中間文件也節約了網絡 帶寬;第三,屢次執行相同的任務能夠減小性能緩慢的機器帶來的負面影響(alex注:即硬件配置的不平衡),同時解決了因爲機器失效致使的數據丟失問題。
附錄A、單詞頻率統計
本節包含了一個完整的程序,用於統計在一組命令行指定的輸入文件中,每個不一樣的單詞出現頻率。
#include 「mapreduce/mapreduce.h」
// User’s map function
class WordCounter : public Mapper {
public:
virtual void Map(const MapInput& input) {
const string& text = input.value();
const int n = text.size();
for (int i = 0; i < n; ) {
// Skip past leading whitespace
while ((i < n) && isspace(text[i]))
i++;
// Find word end
int start = i;
while ((i < n) && !isspace(text[i]))
i++;
if (start < i)
Emit(text.substr(start,i-start),」1″);
}
}
};
REGISTER_MAPPER(WordCounter);
// User’s reduce function
class Adder : public Reducer {
virtual void Reduce(ReduceInput* input) {
// Iterate over all entries with the
// same key and add the values
int64 value = 0;
while (!input->done()) {
value += StringToInt(input->value());
input->NextValue();
}
// Emit sum for input->key()
Emit(IntToString(value));
}
};
REGISTER_REDUCER(Adder);
int main(int argc, char** argv) {
ParseCommandLineFlags(argc, argv);
MapReduceSpecification spec;
// Store list of input files into 「spec」
for (int i = 1; i < argc; i++) {
MapReduceInput* input = spec.add_input();
input->set_format(「text」);
input->set_filepattern(argv[i]);
input->set_mapper_class(「WordCounter」);
}
// Specify the output files:
// /gfs/test/freq-00000-of-00100
// /gfs/test/freq-00001-of-00100
// …
MapReduceOutput* out = spec.output();
out->set_filebase(「/gfs/test/freq」);
out->set_num_tasks(100);
out->set_format(「text」);
out->set_reducer_class(「Adder」);
// Optional: do partial sums within map
// tasks to save network bandwidth
out->set_combiner_class(「Adder」);
// Tuning parameters: use at most 2000 // machines and 100 MB of memory per task spec.set_machines(2000); spec.set_map_megabytes(100); spec.set_reduce_megabytes(100); // Now run it MapReduceResult result; if (!MapReduce(spec, &result)) abort(); // Done: ‘result’ structure contains info // about counters, time taken, number of // machines used, etc. return 0; }