MapReduce:超大機羣上的簡單數據處理程序員
摘要web
MapReduce是一個編程模型,和處理,產生大數據集的相關實現.用戶指定一個map函數處理一個key/value對,從而產生中間的key/value對集.而後再指定一個reduce函數合併全部的具備相同中間key的中間value.下面將列舉許多能夠用這個模型來表示的現實世界的工做.數據庫
以這種方式寫的程序能自動的在大規模的普通機器上實現並行化.這個運行時系統關心這些細節:分割輸入數據,在機羣上的調度,機器的錯誤處理,管理機器之間必要的通訊.這樣就可讓那些沒有並行分佈式處理系統經驗的程序員利用大量分佈式系統的資源.編程
咱們的MapReduce實現運行在規模能夠靈活調整的由普通機器組成的機羣上,一個典型的MapReduce計算處理幾千臺機器上的以TB計算的數據.程序員發現這個系統很是好用:已經實現了數以百計的MapReduce程序,天天在Google的機羣上都有1000多個MapReduce程序在執行.設計模式
1.介紹api
在過去的5年裏,做者和Google的許多人已經實現了數以百計的爲專門目的而寫的計算來處理大量的原始數據,好比,爬行的文檔,Web請求日誌,等等.爲了計算各類類型的派生數據,好比,倒排索引,Web文檔的圖結構的各類表示,每一個主機上爬行的頁面數量的概要,天天被請求數量最多的集合,等等.不少這樣的計算在概念上很容易理解.然而,輸入的數據量很大,而且只有計算被分佈在成百上千的機器上才能在能夠接受的時間內完成.怎樣並行計算,分發數據,處理錯誤,全部這些問題綜合在一塊兒,使得本來很簡介的計算,由於要大量的複雜代碼來處理這些問題,而變得讓人難以處理.數組
做爲對這個複雜性的迴應,咱們設計一個新的抽象模型,它讓咱們表示咱們將要執行的簡單計算,而隱藏並行化,容錯,數據分佈,負載均衡的那些雜亂的細節,在一個庫裏.咱們的抽象模型的靈感來自Lisp和許多其餘函數語言的map和reduce的原始表示.咱們認識到咱們的許多計算都包含這樣的操做:在咱們輸入數據的邏輯記錄上應用map操做,來計算出一箇中間key/value對集,在全部具備相同key的value上應用reduce操做,來適當的合併派生的數據.功能模型的使用,再結合用戶指定的map和reduce操做,讓咱們能夠很是容易的實現大規模並行化計算,和使用再次執行做爲初級機制來實現容錯.緩存
這個工做的主要貢獻是經過簡單有力的接口來實現自動的並行化和大規模分佈式計算,結合這個接口的實現來在大量普通的PC機上實現高性能計算.服務器
第二部分描述基本的編程模型,而且給一些例子.第三部分描述符合咱們的基於集羣的計算環境的MapReduce的接口的實現.第四部分描述咱們以爲編程模型中一些有用的技巧.第五部分對於各類不一樣的任務,測量咱們實現的性能.第六部分探究在Google內部使用MapReduce做爲基礎來重寫咱們的索引系統產品.第七部分討論相關的,和將來的工做.網絡
2.編程模型
計算利用一個輸入key/value對集,來產生一個輸出key/value對集.MapReduce庫的用戶用兩個函數表達這個計算:map和reduce.
用戶自定義的map函數,接受一個輸入對,而後產生一箇中間key/value對集.MapReduce庫把全部具備相同中間key I的中間value聚合在一塊兒,而後把它們傳遞給reduce函數.
用戶自定義的reduce函數,接受一箇中間key I和相關的一個value集.它合併這些value,造成一個比較小的value集.通常的,每次reduce調用只產生0或1個輸出value.經過一個迭代器把中間value提供給用戶自定義的reduce函數.這樣可使咱們根據內存來控制value列表的大小.
2.1 實例
考慮這個問題:計算在一個大的文檔集合中每一個詞出現的次數.用戶將寫和下面相似的僞代碼:
map(String key,String value):
//key:文檔的名字
//value:文檔的內容
for each word w in value:
EmitIntermediate(w,"1");
reduce(String key,Iterator values):
//key:一個詞
//values:一個計數列表
int result=0;
for each v in values:
result+=ParseInt(v);
Emit(AsString(resut));
map函數產生每一個詞和這個詞的出現次數(在這個簡單的例子裏就是1).reduce函數把產生的每個特定的詞的計數加在一塊兒.
另外,用戶用輸入輸出文件的名字和可選的調節參數來填充一個mapreduce規範對象.用戶而後調用MapReduce函數,並把規範對象傳遞給它.用戶的代碼和MapReduce庫連接在一塊兒(用C++實現).附錄A包含這個實例的所有文本.
2.2類型
即便前面的僞代碼寫成了字符串輸入和輸出的term格式,可是概念上用戶寫的map和reduce函數有關聯的類型:
map(k1,v1) ->list(k2,v2)
reduce(k2,list(v2)) ->list(v2)
例如,輸入的key,value和輸出的key,value的域不一樣.此外,中間key,value和輸出key,values的域相同.
咱們的C++實現傳遞字符串來和用戶自定義的函數交互,並把它留給用戶的代碼,來在字符串和適當的類型間進行轉換.
2.3更多實例
這裏有一些讓人感興趣的簡單程序,能夠容易的用MapReduce計算來表示.
分佈式的Grep(UNIX工具程序, 可作文件內的字符串查找):若是輸入行匹配給定的樣式,map函數就輸出這一行.reduce函數就是把中間數據複製到輸出.
計算URL訪問頻率:map函數處理web頁面請求的記錄,輸出(URL,1).reduce函數把相同URL的value都加起來,產生一個(URL,記錄總數)的對.
倒轉網絡連接圖:map函數爲每一個連接輸出(目標,源)對,一個URL叫作目標,包含這個URL的頁面叫作源.reduce函數根據給定的相關目標URLs鏈接全部的源URLs造成一個列表,產生(目標,源列表)對.
每一個主機的術語向量:一個術語向量用一個(詞,頻率)列表來概述出如今一個文檔或一個文檔集中的最重要的一些詞.map函數爲每個輸入文檔產生一個(主機名,術語向量)對(主機名來自文檔的URL).reduce函數接收給定主機的全部文檔的術語向量.它把這些術語向量加在一塊兒,丟棄低頻的術語,而後產生一個最終的(主機名,術語向量)對.
倒排索引:map函數分析每一個文檔,而後產生一個(詞,文檔號)對的序列.reduce函數接受一個給定詞的全部對,排序相應的文檔IDs,而且產生一個(詞,文檔ID列表)對.全部的輸出對集造成一個簡單的倒排索引.它能夠簡單的增長跟蹤詞位置的計算.
分佈式排序:map函數從每一個記錄提取key,而且產生一個(key,record)對.reduce函數不改變任何的對.這個計算依賴分割工具(在4.1描述)和排序屬性(在4.2描述).
3實現
MapReduce接口可能有許多不一樣的實現.根據環境進行正確的選擇.例如,一個實現對一個共享內存較小的機器是合適的,另外的適合一個大NUMA的多處理器的機器,而有的適合一個更大的網絡機器的集合.
這部分描述一個在Google普遍使用的計算環境的實現:用交換機鏈接的普通PC機的大機羣.咱們的環境是:
1.Linux操做系統,雙處理器,2-4GB內存的機器.
2.普通的網絡硬件,每一個機器的帶寬或者是百兆或者千兆,可是平均小於所有帶寬的一半.
3.由於一個機羣包含成百上千的機器,全部機器會常常出現問題.
4.存儲用直接連到每一個機器上的廉價IDE硬盤.一個從內部文件系統發展起來的分佈式文件系統被用來管理存儲在這些磁盤上的數據.文件系統用複製的方式在不可靠的硬件上來保證可靠性和有效性.
5.用戶提交工做給調度系統.每一個工做包含一個任務集,每一個工做被調度者映射到機羣中一個可用的機器集上.
3.1執行預覽
經過自動分割輸入數據成一個有M個split的集,map調用被分佈到多臺機器上.輸入的split可以在不一樣的機器上被並行處理.經過用分割函數分割中間key,來造成R個片(例如,hash(key) mod R),reduce調用被分佈到多臺機器上.分割數量(R)和分割函數由用戶來指定.
圖1顯示了咱們實現的MapReduce操做的所有流程.當用戶的程序調用MapReduce的函數的時候,將發生下面的一系列動做(下面的數字和圖1中的數字標籤相對應):
1.在用戶程序裏的MapReduce庫首先分割輸入文件成M個片,每一個片的大小通常從 16到64MB(用戶能夠經過可選的參數來控制).而後在機羣中開始大量的拷貝程序.
2.這些程序拷貝中的一個是master,其餘的都是由master分配任務的worker.有M 個map任務和R個reduce任務將被分配.管理者分配一個map任務或reduce任務給一個空閒的worker.
3.一個被分配了map任務的worker讀取相關輸入split的內容.它從輸入數據中分析出key/value對,而後把key/value對傳遞給用戶自定義的map函數.由map函數產生的中間key/value對被緩存在內存中.
4.緩存在內存中的key/value對被週期性的寫入到本地磁盤上,經過分割函數把它們寫入R個區域.在本地磁盤上的緩存對的位置被傳送給master,master負責把這些位置傳送給reduce worker.
5.當一個reduce worker獲得master的位置通知的時候,它使用遠程過程調用來從map worker的磁盤上讀取緩存的數據.當reduce worker讀取了全部的中間數據後,它經過排序使具備相同key的內容聚合在一塊兒.由於許多不一樣的key映射到相同的reduce任務,因此排序是必須的.若是中間數據比內存還大,那麼還須要一個外部排序.
6.reduce worker迭代排過序的中間數據,對於遇到的每個惟一的中間key,它把key和相關的中間value集傳遞給用戶自定義的reduce函數.reduce函數的輸出被添加到這個reduce分割的最終的輸出文件中.
7.當全部的map和reduce任務都完成了,管理者喚醒用戶程序.在這個時候,在用戶程序裏的MapReduce調用返回到用戶代碼.
在成功完成以後,mapreduce執行的輸出存放在R個輸出文件中(每個reduce任務產生一個由用戶指定名字的文件).通常,用戶不須要合併這R個輸出文件成一個文件--他們常常把這些文件看成一個輸入傳遞給其餘的MapReduce調用,或者在能夠處理多個分割文件的分佈式應用中使用他們.
3.2master數據結構
master保持一些數據結構.它爲每個map和reduce任務存儲它們的狀態(空閒,工做中,完成),和worker機器(非空閒任務的機器)的標識.
master就像一個管道,經過它,中間文件區域的位置從map任務傳遞到reduce任務.所以,對於每一個完成的map任務,master存儲由map任務產生的R箇中間文件區域的大小和位置.當map任務完成的時候,位置和大小的更新信息被接受.這些信息被逐步增長的傳遞給那些正在工做的reduce任務.
3.3容錯
由於MapReduce庫被設計用來使用成百上千的機器來幫助處理很是大規模的數據,因此這個庫必需要能很好的處理機器故障.
worker故障
master週期性的ping每一個worker.若是master在一個肯定的時間段內沒有收到worker返回的信息,那麼它將把這個worker標記成失效.由於每個由這個失效的worker完成的map任務被從新設置成它初始的空閒狀態,因此它能夠被安排給其餘的worker.一樣的,每個在失敗的worker上正在運行的map或reduce任務,也被從新設置成空閒狀態,而且將被從新調度.
在一個失敗機器上已經完成的map任務將被再次執行,由於它的輸出存儲在它的磁盤上,因此不可訪問.已經完成的reduce任務將不會再次執行,由於它的輸出存儲在全局文件系統中.
當一個map任務首先被worker A執行以後,又被B執行了(由於A失效了),從新執行這個狀況被通知給全部執行reduce任務的worker.任何尚未從A讀數據的reduce任務將從worker B讀取數據.
MapReduce能夠處理大規模worker失敗的狀況.例如,在一個MapReduce操做期間,在正在運行的機羣上進行網絡維護引發80臺機器在幾分鐘內不可訪問了,MapReduce master只是簡單的再次執行已經被不可訪問的worker完成的工做,繼續執行,最終完成這個MapReduce操做.
master失敗
能夠很容易的讓管理者週期的寫入上面描述的數據結構的checkpoints.若是這個master任務失效了,能夠從上次最後一個checkpoint開始啓動另外一個master進程.然而,由於只有一個master,因此它的失敗是比較麻煩的,所以咱們如今的實現是,若是master失敗,就停止MapReduce計算.客戶能夠檢查這個狀態,而且能夠根據須要從新執行MapReduce操做.
在錯誤面前的處理機制
當用戶提供的map和reduce操做對它的輸出值是肯定的函數時,咱們的分佈式實現產生,和所有程序沒有錯誤的順序執行同樣,相同的輸出.
咱們依賴對map和reduce任務的輸出進行原子提交來完成這個性質.每一個工做中的任務把它的輸出寫到私有臨時文件中.一個reduce任務產生一個這樣的文件,而一個map任務產生R個這樣的文件(一個reduce任務對應一個文件).當一個map任務完成的時候,worker發送一個消息給master,在這個消息中包含這R個臨時文件的名字.若是master從一個已經完成的map任務再次收到一個完成的消息,它將忽略這個消息.不然,它在master的數據結構裏記錄這R個文件的名字.
當一個reduce任務完成的時候,這個reduce worker原子的把臨時文件重命名成最終的輸出文件.若是相同的reduce任務在多個機器上執行,多個重命名調用將被執行,併產生相同的輸出文件.咱們依賴由底層文件系統提供的原子重命名操做來保證,最終的文件系統狀態僅僅包含一個reduce任務產生的數據.
咱們的map和reduce操做大部分都是肯定的,而且咱們的處理機制等價於一個順序的執行的這個事實,使得程序員能夠很容易的理解程序的行爲.當map或/和reduce操做是不肯定的時候,咱們提供雖然比較弱可是合理的處理機制.當在一個非肯定操做的前面,一個reduce任務R1的輸出等價於一個非肯定順序程序執行產生的輸出.然而,一個不一樣的reduce任務R2的輸出也許符合一個不一樣的非肯定順序程序執行產生的輸出.
考慮map任務M和reduce任務R1,R2的狀況.咱們設定e(Ri)爲已經提交的Ri的執行(有且僅有一個這樣的執行).這個比較弱的語義出現,由於e(R1)也許已經讀取了由M的執行產生的輸出,而e(R2)也許已經讀取了由M的不一樣執行產生的輸出.
3.4存儲位置
在咱們的計算機環境裏,網絡帶寬是一個至關缺少的資源.咱們利用把輸入數據(由GFS管理)存儲在機器的本地磁盤上來保存網絡帶寬.GFS把每一個文件分紅64MB的一些塊,而後每一個塊的幾個拷貝存儲在不一樣的機器上(通常是3個拷貝).MapReduce的master考慮輸入文件的位置信息,而且努力在一個包含相關輸入數據的機器上安排一個map任務.若是這樣作失敗了,它嘗試在那個任務的輸入數據的附近安排一個map任務(例如,分配到一個和包含輸入數據塊在一個switch裏的worker機器上執行).當運行巨大的MapReduce操做在一個機羣中的一部分機器上的時候,大部分輸入數據在本地被讀取,從而不消耗網絡帶寬.
3.5任務粒度
象上面描述的那樣,咱們細分map階段成M個片,reduce階段成R個片.M和R應當比worker機器的數量大許多.每一個worker執行許多不一樣的工做來提升動態負載均衡,也能夠加速從一個worker失效中的恢復,這個機器上的許多已經完成的map任務能夠被分配到全部其餘的worker機器上.
在咱們的實現裏,M和R的範圍是有大小限制的,由於master必須作O(M+R)次調度,而且保存O(M*R)個狀態在內存中.(這個因素使用的內存是不多的,在O(M*R)個狀態片裏,大約每一個map任務/reduce任務對使用一個字節的數據).
此外,R常常被用戶限制,由於每個reduce任務最終都是一個獨立的輸出文件.實際上,咱們傾向於選擇M,以便每個單獨的任務大概都是16到64MB的輸入數據(以便上面描述的位置優化是最有效的),咱們把R設置成咱們但願使用的worker機器數量的小倍數.咱們常常執行MapReduce計算,在M=200000,R=5000,使用2000臺工做者機器的狀況下.
3.6備用任務
一個落後者是延長MapReduce操做時間的緣由之一:一個機器花費一個異乎尋常地的長時間來完成最後的一些map或reduce任務中的一個.有不少緣由可能產生落後者.例如,一個有壞磁盤的機器常常發生能夠糾正的錯誤,這樣就使讀性能從30MB/s下降到3MB/s.機羣調度系統也許已經安排其餘的任務在這個機器上,因爲計算要使用CPU,內存,本地磁盤,網絡帶寬的緣由,引發它執行MapReduce代碼很慢.咱們最近遇到的一個問題是,一個在機器初始化時的Bug引發處理器緩存的失效:在一個被影響的機器上的計算性能有上百倍的影響.
咱們有一個通常的機制來減輕這個落後者的問題.當一個MapReduce操做將要完成的時候,master調度備用進程來執行那些剩下的還在執行的任務.不管是原來的仍是備用的執行完成了,工做都被標記成完成.咱們已經調整了這個機制,一般只會佔用多幾個百分點的機器資源.咱們發現這能夠顯著的減小完成大規模MapReduce操做的時間.做爲一個例子,將要在5.3描述的排序程序,在關閉掉備用任務的狀況下,要比有備用任務的狀況下多花44%的時間.
4技巧
儘管簡單的map和reduce函數的功能對於大多數需求是足夠的了,可是咱們開發了一些有用的擴充.這些將在這個部分描述.
4.1分割函數
MapReduce用戶指定reduce任務和reduce任務須要的輸出文件的數量.在中間key上使用分割函數,使數據分割後經過這些任務.一個缺省的分割函數使用hash方法(例如,hash(key) mod R).這個致使很是平衡的分割.而後,有的時候,使用其餘的key分割函數來分割數據有很是有用的.例如,有時候,輸出的key是URLs,而且咱們但願每一個主機的全部條目保持在同一個輸出文件中.爲了支持像這樣的狀況,MapReduce庫的用戶能夠提供專門的分割函數.例如,使用"hash(Hostname(urlkey)) mod R"做爲分割函數,使全部來自同一個主機的URLs保存在同一個輸出文件中.
4.2順序保證
咱們保證在一個給定的分割裏面,中間key/value對以key遞增的順序處理.這個順序保證可使每一個分割產出一個有序的輸出文件,當輸出文件的格式須要支持有效率的隨機訪問key的時候,或者對輸出數據集再做排序的時候,就很容易.
4.3combiner函數
在某些狀況下,容許中間結果key重複會佔據至關的比重,而且用戶定義的reduce函數
知足結合律和交換律.一個很好的例子就是在2.1部分的詞統計程序.由於詞頻率傾向於一個zipf分佈(齊夫分佈),每一個map任務將產生成百上千個這樣的記錄<the,1>.全部的這些計數將經過網絡被傳輸到一個單獨的reduce任務,而後由reduce函數加在一塊兒產生一個數字.咱們容許用戶指定一個可選的combiner函數,先在本地進行合併一下,而後再經過網絡發送.
在每個執行map任務的機器上combiner函數被執行.通常的,相同的代碼被用在combiner和reduce函數.在combiner和reduce函數之間惟一的區別是MapReduce庫怎樣控制函數的輸出.reduce函數的輸出被保存最終輸出文件裏.combiner函數的輸出被寫到中間文件裏,而後被髮送給reduce任務.
部分使用combiner能夠顯著的提升一些MapReduce操做的速度.附錄A包含一個使用combiner函數的例子.
4.4輸入輸出類型
MapReduce庫支持以幾種不一樣的格式讀取輸入數據.例如,文本模式輸入把每一行看做是一個key/value對.key是文件的偏移量,value是那一行的內容.其餘普通的支持格式以key的順序存儲key/value對序列.每個輸入類型的實現知道怎樣把輸入分割成對每一個單獨的map任務來講是有意義的(例如,文本模式的範圍分割確保僅僅在每行的邊界進行範圍分割).雖然許多用戶僅僅使用不多的預約意輸入類型的一個,可是用戶能夠經過提供一個簡單的reader接口來支持一個新的輸入類型.
一個reader沒必要要從文件裏讀數據.例如,咱們能夠很容易的定義它從數據庫裏讀記錄,或從內存中的數據結構讀取.
4.5反作用
有的時候,MapReduce的用戶發如今map操做或/和reduce操做時產生輔助文件做爲一個附加的輸出是很方便的.咱們依靠應用程序寫來使這個反作用成爲原子的.通常的,應用程序寫一個臨時文件,而後一旦這個文件所有產生完,就自動的被重命名.
對於單個任務產生的多個輸出文件來講,咱們沒有提供其上的兩階段提交的原子操做支持.所以,一個產生須要交叉文件鏈接的多個輸出文件的任務,應該使肯定性的任務.不過這個限制在實際的工做中並非一個問題.
4.6跳過錯誤記錄
有的時候由於用戶的代碼裏有bug,致使在某一個記錄上map或reduce函數忽然crash掉.這樣的bug使得MapReduce操做不能完成.雖然通常是修復這個bug,可是有時候這是不現實的;也許這個bug是在源代碼不可獲得的第三方庫裏.有的時候也能夠忽略一些記錄,例如,當在一個大的數據集上進行統計分析.咱們提供一個可選的執行模式,在這個模式下,MapReduce庫檢測那些記錄引發的crash,而後跳過那些記錄,來繼續執行程序.
每一個worker程序安裝一個信號處理器來獲取內存段異常和總線錯誤.在調用一個用戶自定義的map或reduce操做以前,MapReduce庫把記錄的序列號存儲在一個全局變量裏.若是用戶代碼產生一個信號,那個信號處理器就會發送一個包含序號的"last gasp"UDP包給MapReduce的master.當master不止一次看到同一個記錄的時候,它就會指出,當相關的map或reduce任務再次執行的時候,這個記錄應當被跳過.
4.7本地執行
調試在map或reduce函數中問題是很困難的,由於實際的計算髮生在一個分佈式的系統中,常常是有一個master動態的分配工做給幾千臺機器.爲了簡化調試和測試,咱們開發了一個可替換的實現,這個實如今本地執行全部的MapReduce操做.用戶能夠控制執行,這樣計算能夠限制到特定的map任務上.用戶以一個標誌調用他們的程序,而後能夠容易的使用他們認爲好用的任何調試和測試工具(例如,gdb).
4.8狀態信息
master運行一個HTTP服務器,而且能夠輸出一組情況頁來供人們使用.狀態頁顯示計算進度,象多少個任務已經完成,多少個還在運行,輸入的字節數,中間數據字節數,輸出字節數,處理百分比,等等.這個頁也包含到標準錯誤的連接,和由每一個任務產生的標準輸出的連接.用戶能夠根據這些數據預測計算須要花費的時間,和是否須要更多的資源.當計算比預期的要慢不少的時候,這些頁面也能夠被用來判斷是否是這樣.
此外,最上面的狀態頁顯示已經有多少個工做者失敗了,和當它們失敗的時候,那個map和reduce任務正在運行.當試圖診斷在用戶代碼裏的bug時,這個信息也是有用的.
4.9計數器
MapReduce庫提供一個計數器工具,來計算各類事件的發生次數.例如,用戶代碼想要計算全部處理的詞的個數,或者被索引的德文文檔的數量.
爲了使用這個工具,用戶代碼建立一個命名的計數器對象,而後在map或/和reduce函數裏適當的增長計數器.例如:
Counter * uppercase;
uppercase=GetCounter("uppercase");
map(String name,String contents):
for each word w in contents:
if(IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w,"1");
來自不一樣worker機器上的計數器值被週期性的傳送給master(在ping迴應裏).master把來自成功的map和reduce任務的計數器值加起來,在MapReduce操做完成的時候,把它返回給用戶代碼.當前計數器的值也被顯示在master狀態頁裏,以便人們能夠查看實際的計算進度.當計算計數器值的時候消除重複執行的影響,避免數據的累加.(在備用任務的使用,和因爲出錯的從新執行,能夠產生重複執行)
有些計數器值被MapReduce庫自動的維護,好比,被處理的輸入key/value對的數量,和被產生的輸出key/value對的數量.
用戶發現計數器工具對於檢查MapReduce操做的完整性頗有用.例如,在一些MapReduce操做中,用戶代碼也許想要確保輸出對的數量徹底等於輸入對的數量,或者處理過的德文文檔的數量是在所有被處理的文檔數量中屬於合理的範圍.
5性能
在本節,咱們用在一個大型集羣上運行的兩個計算來衡量MapReduce的性能.一個計算用來在一個大概1TB的數據中查找特定的匹配串.另外一個計算排序大概1TB的數據.
這兩個程序表明了MapReduce的用戶實現的真實的程序的一個大子集.一類是,把數據從一種表示轉化到另外一種表示.另外一類是,從一個大的數據集中提取少許的關心的數據.
5.1機羣配置
全部的程序在包含大概1800臺機器的機羣上執行.機器的配置是:2個2G的Intel Xeon超線程處理器,4GB內存,兩個160GB IDE磁盤,一個千兆網卡.這些機器部署在一個由兩層的,樹形交換網絡中,在根節點上大概有100到2000G的帶寬.全部這些機器都有相同的部署(對等部署),所以任意兩點之間的來回時間小於1毫秒.
在4GB的內存裏,大概有1-1.5GB被用來運行在機羣中其餘的任務.這個程序是在週末的下午開始執行的,這個時候CPU,磁盤,網絡基本上是空閒的.
5.2Grep
這個Grep程序掃描大概10^10個,每一個100字節的記錄,查找比較少的3字符的查找串(這個查找串出如今92337個記錄中).輸入數據被分割成大概64MB的片(M=15000),所有 的輸出存放在一個文件中(R=1).
圖2顯示計算過程隨時間變化的狀況.Y軸表示輸入數據被掃描的速度.隨着更多的機羣被分配給這個MapReduce計算,速度在逐步的提升,當有1764個worker的時候這個速度達到最高的30GB/s.當map任務完成的時候,速度開始降低,在計算開始後80秒,輸入的速度降到0.這個計算持續的時間大概是150秒.這包括了前面大概一分鐘的啓動時間.啓動時間用來把程序傳播到全部的機器上,等待GFS打開1000個輸入文件,獲得必要的位置優化信息.
5.3排序
這個sort程序排序10^10個記錄,每一個記錄100個字節(大概1TB的數據).這個程序是模仿TeraSort的.
這個排序程序只包含不到50行的用戶代碼.其中有3行map函數用來從文本行提取10字節的排序key,而且產生一個由這個key和原始文本行組成的中間key/value對.咱們使用一個內置的Identity函數做爲reduce操做.這個函數直接把中間key/value對做爲輸出的key/value對.最終的排序輸出寫到一個2路複製的GFS文件中(也就是,程序的輸出會寫2TB的數據).
象之前同樣,輸入數據被分割成64MB的片(M=15000).咱們把排序後的輸出寫到4000個文件中(R=4000).分區函數使用key的原始字節來把數據分區到R個小片中.
咱們以這個基準的分割函數,知道key的分佈狀況.在通常的排序程序中,咱們會增長一個預處理的MapReduce操做,這個操做用於採樣key的狀況,而且用這個採樣的key的分佈狀況來計算對最終排序處理的分割點。
圖3(a)顯示這個排序程序的正常執行狀況.左上圖顯示輸入數據的讀取速度.這個速度最高到達13GB/s,而且在不到200秒全部map任務完成以後迅速滑落到0.注意到這個輸入速度小於Grep.這是由於這個排序map任務花費大概一半的時間和帶寬,來把中間數據寫到本地硬盤中.而Grep相關的中間數據能夠忽略不計.
左中圖顯示數據經過網絡從map任務傳輸給reduce任務的速度.當第一個map任務完成後,這個排序過程就開始了.圖示上的第一個高峯是啓動了第一批大概1700個reduce任務(整個MapReduce任務被分配到1700臺機器上,每一個機器一次只執行一個reduce任務).大概開始計算後的300秒,第一批reduce任務中的一些完成了,咱們開始執行剩下的reduce任務.所有的排序過程持續了大概600秒的時間.
左下圖顯示排序後的數據被reduce任務寫入最終文件的速度.由於機器忙於排序中間數據,因此在第一個排序階段的結束和寫階段的開始有一個延遲.寫的速度大概是2-4GB/s.大概開始計算後的850秒寫過程結束.包括前面的啓動過程,所有的計算任務持續的891秒.這個和TeraSort benchmark的最高紀錄1057秒差很少.
須要注意的事情是:所以位置優化的緣由,不少數據都是從本地磁盤讀取的而沒有經過咱們有限帶寬的網絡,因此輸入速度比排序速度和輸出速度都要快.排序速度比輸出速度快的緣由是輸出階段寫兩個排序後數據的拷貝(咱們寫兩個副本的緣由是爲了可靠性和可用性).咱們寫兩份的緣由是由於底層文件系統的可靠性和可用性的要求.若是底層文件系統用相似容錯編碼(erasure coding)的方式,而不採用複製寫的方式,在寫盤階段能夠下降網絡帶寬的要求。
5.4備用任務的影響
在圖3(b)中,顯示咱們不用備用任務的排序程序的執行狀況.除了它有一個很長的幾乎沒有寫動做發生的尾巴外,執行流程和圖3(a)類似.在960秒後,只有5個reduce任務沒有完成.然而,就是這最後幾個落後者知道300秒後才完成.所有的計算任務執行了1283秒,多花了44%的時間.
5.5機器失效
在圖3(c)中,顯示咱們有意的在排序程序計算過程當中中止1746臺worker中的200臺機器上的程序的狀況.底層機羣調度者在這些機器上立刻從新開始新的worker程序(由於僅僅程序被中止,而機器仍然在正常運行).
由於已經完成的map工做丟失了(因爲相關的map worker被殺掉了),須要從新再做,因此worker死掉會致使一個負數的輸入速率.相關map任務的從新執行很快就從新執行了.整個計算過程在933秒內完成,包括了前邊的啓動時間(只比正常執行時間多了5%的時間).
6經驗
咱們在2003年的2月寫了MapReduce庫的第一個版本,而且在2003年的8月作了顯著的加強,包括位置優化,worker機器間任務執行的動態負載均衡,等等.從那個時候起,咱們驚奇的發現MapReduce函數庫普遍用於咱們平常處理的問題.它如今在Google內部各個領域內普遍應用,包括:
大規模機器學習問題
Google News和Froogle產品的機器問題.
提取數據產生一個流行查詢的報告(例如,Google Zeitgeist).
爲新的試驗和產品提取網頁的屬性(例如,從一個web頁的大集合中提取位置信息 用在位置查詢).
大規模的圖計算.
圖4顯示了咱們主要的源代碼管理系統中,隨着時間推移,MapReduce程序的顯著增長,從2003年早先時候的0個增加到2004年9月份的差很少900個不一樣的程序.MapReduce之因此這樣的成功,是由於他可以在不到半小時時間內寫出一個簡單的可以應用於上千臺機器的大規模併發程序,而且極大的提升了開發和原形設計的週期效率.而且,他可讓一個徹底沒有分佈式和/或並行系統經驗的程序員,可以很容易的利用大量的資源.
在每個任務結束的時候,MapReduce函數庫記錄使用的計算資源的統計信息.在圖1裏,咱們列出了2004年8月份在Google運行的一些MapReduce的工做的統計信息.
6.1大規模索引
到目前爲止,最成功的MapReduce的應用就是重寫了Google web 搜索服務所使用到的index系統.索引系統處理爬蟲系統抓回來的超大量的文檔集,這些文檔集保存在GFS文件裏.這些文檔的原始內容的大小,超過了20TB.索引程序是經過一系列的,大概5到10次MapReduce操做來創建索引.經過利用MapReduce(替換掉上一個版本的特別設計的分佈處理的索引程序版本)有這樣一些好處:
索引的代碼簡單,量少,容易理解,由於容錯,分佈式,並行處理都隱藏在MapReduce庫中了.例如,當使用MapReduce函數庫的時候,計算的代碼行數從原來的3800行C++代碼一下減小到大概700行代碼.
MapReduce的函數庫的性能已經很是好,因此咱們能夠把概念上不相關的計算步驟分開處理,而不是混在一塊兒以期減小在數據上的處理.這使得改變索引過程很容易.例如,咱們對老索引系統的一個小更改可能要好幾個月的時間,可是在新系統內,只須要花幾天時間就能夠了.
索引系統的操做更容易了,這是由於機器的失效,速度慢的機器,以及網絡失效都已經由MapReduce本身解決了,而不須要操做人員的交互.另外,咱們能夠簡單的經過對索引系統增長機器的方式提升處理性能.
7相關工做
不少系統都提供了嚴格的設計模式,而且經過對編程的嚴格限制來實現自動的並行計算.例如,一個結合函數能夠經過N個元素的數組的前綴在N個處理器上使用並行前綴計算在log N的時間內計算完.MapReduce是基於咱們的大型現實計算的經驗,對這些模型的一個簡化和精煉.而且,咱們還提供了基於上千臺處理器的容錯實現.而大部分併發處理系統都只在小規模的尺度上實現,而且機器的容錯仍是程序員來控制的.
Bulk Synchronous Programming以及一些MPI primitives提供了更高級別的抽象,能夠更容易寫出並行處理的程序.這些系統和MapReduce系統的不一樣之處在,MapReduce利用嚴格的編程模式自動實現用戶程序的併發處理,而且提供了透明的容錯處理.
咱們本地的優化策略是受active disks等技術的啓發,在active disks中,計算任務是儘可能推送到靠近本地磁盤的處理單元上,這樣就減小了經過I/O子系統或網絡的數據量.咱們在少許磁盤直接鏈接到普通處理機運行,來代替直接鏈接到磁盤控制器的處理機上,可是通常的步驟是類似的.
咱們的備用任務的機制和在Charlotte系統上的積極調度機制類似.這個簡單的積極調度的一個缺陷是,若是一個任務引發了一個重複性的失敗,那個整個計算將沒法完成.咱們經過在故障狀況下跳過故障記錄的機制,在某種程度上解決了這個問題.
MapReduce實現依賴一個內置的機羣管理系統來在一個大規模共享機器組上分佈和運行用戶任務.雖然這個不是本論文的重點,可是集羣管理系統在理念上和Condor等其餘系統是同樣的.
在MapReduce庫中的排序工具在操做上和NOW-Sort類似.源機器(map worker)分割將要被排序的數據,而後把它發送到R個reduce worker中的一個上.每一個reduce worker來本地排序它的數據(若是可能,就在內存中).固然,NOW-Sort沒有用戶自定義的map和reduce函數,使得咱們的庫能夠普遍的應用.
River提供一個編程模型,在這個模型下,處理進程能夠靠在分佈式的隊列上發送數據進行彼此通信.和MapReduce同樣,River系統嘗試提供對不一樣應用有近似平均的性能,即便在不對等的硬件環境下或者在系統顛簸的狀況下也能提供近似平均的性.River是經過精心調度硬盤和網絡的通信,來平衡任務的完成時間.MapReduce不和它不一樣.利用嚴格編程模型,MapReduce構架來把問題分割成大量的任務.這些任務被自動的在可用的worker上調度,以便速度快的worker能夠處理更多的任務.這個嚴格編程模型也讓咱們能夠在工做快要結束的時候安排冗餘的執行,來在非一致處理的狀況減小完成時間(好比,在有慢機或者阻塞的worker的時候).
BAD-FS是一個很MapReduce徹底不一樣的編程模型,它的目標是在一個廣闊的網絡上執行工做.然而,它們有兩個基本原理是相同的.(1)這兩個系統使用冗餘的執行來從由失效引發的數據丟失中恢復.(2)這兩個系統使用本地化調度策略,來減小經過擁擠的網絡鏈接發送的數據數量.
TACC是一個被設計用來簡化高有效性網絡服務結構的系統.和MapReduce同樣,它經過再次執行來實現容錯.
8結束語
MapReduce編程模型已經在Google成功的用在不一樣的目的.咱們把這個成功歸於如下幾個緣由:第一,這個模型使用簡單,甚至對沒有並行和分佈式經驗的程序員也是如此,由於它隱藏了並行化,容錯,位置優化和負載均衡的細節.第二,大量不一樣的問題能夠用MapReduce計算來表達.例如,MapReduce被用來,爲Google的產品web搜索服務,排序,數據挖掘,機器學習,和其餘許多系統,產生數據.第三,咱們已經在一個好幾千臺計算機的大型集羣上開發實現了這個MapReduce.這個實現使得對於這些機器資源的利用很是簡單,所以也適用於解決Google遇到的其餘不少須要大量計算的問題.
從這個工做中咱們也學習到了一些東西.首先,嚴格的編程模型使得並行化和分佈式計算簡單,而且也易於構造這樣的容錯計算環境.第二,網絡帶寬是系統的瓶頸.所以在咱們的系統中大量的優化目標是減小經過網絡發送的數據量,本地優化使用咱們從本地磁盤讀取數據,而且把中間數據寫到本地磁盤,以保留網絡帶寬.第三,冗餘的執行能夠用來減小速度慢的機器的影響,和控制機器失效和數據丟失.
感謝
Josh Levenberg校定和擴展了用戶級別的MapReduce API,而且結合他的適用經驗和其餘人的改進建議,增長了不少新的功能.MapReduce從GFS中讀取和寫入數據.咱們要感謝Mohit Aron,Howard Gobioff,Markus Gutschke,David Krame,Shun-Tak Leung,和Josh Redstone,他們在開發GFS中的工做.咱們還感謝Percy Liang Olcan Sercinoglu 在開發用於MapReduce的集羣管理系統得工做.Mike Burrows,Wilson Hsieh,Josh Levenberg,Sharon Perl,RobPike,Debby Wallach爲本論文提出了寶貴的意見.OSDI的無名審閱者,以及咱們的審覈者Eric Brewer,在論文應當如何改進方面給出了有益的意見.最後,咱們感謝Google的工程部的全部MapReduce的用戶,感謝他們提供了有用的反饋,建議,以及錯誤報告等等.
A單詞頻率統計
本節包含了一個完整的程序,用於統計在一組命令行指定的輸入文件中,每個不一樣的單詞出現頻率.
#include "mapreduce/mapreduce.h"
//用戶map函數
class WordCounter : public Mapper {
public:
virtual void Map(const MapInput& input) {
const string& text = input.value();
const int n = text.size();
for (int i = 0; i < n; ) {
//跳過前導空格
while ((i < n) && isspace(text[i]))
i++;
// 查找單詞的結束位置
int start = i;
while ((i < n) && !isspace(text[i]))
i++;
if (start < i)
Emit(text.substr(start,i-start),"1");
}
}
};
REGISTER_MAPPER(WordCounter);
//用戶的reduce函數
class Adder : public Reducer {
virtual void Reduce(ReduceInput* input) {
//迭代具備相同key的全部條目,而且累加它們的value
int64 value = 0;
while (!input->done()) {
value += StringToInt(input->value());
input->NextValue();
}
//提交這個輸入key的綜合
Emit(IntToString(value));
}
};
REGISTER_REDUCER(Adder);
int main(int argc, char** argv) {
ParseCommandLineFlags(argc, argv);
MapReduceSpecification spec;
// 把輸入文件列表存入"spec"
for (int i = 1; i < argc; i++) {
MapReduceInput* input = spec.add_input();
input->set_format("text");
input->set_filepattern(argv[i]);
input->set_mapper_class("WordCounter");
}
//指定輸出文件:
// /gfs/test/freq-00000-of-00100
// /gfs/test/freq-00001-of-00100
// ...
MapReduceOutput* out = spec.output();
out->set_filebase("/gfs/test/freq");
out->set_num_tasks(100);
out->set_format("text");
out->set_reducer_class("Adder");
// 可選操做:在map任務中作部分累加工做,以便節省帶寬
out->set_combiner_class("Adder");
// 調整參數: 使用2000臺機器,每一個任務100MB內存
spec.set_machines(2000);
spec.set_map_megabytes(100);
spec.set_reduce_megabytes(100);
// 運行它
MapReduceResult result;
if (!MapReduce(spec, &result)) abort();
// 完成: 'result'結構包含計數,花費時間,和使用機器的信息
return 0;
}