超大集羣的簡單數據處理程序員
轉自百度文庫web
Jeffrey Dean Sanjay Ghemawat算法
jeff@google.com , sanjay@google.com數據庫
Google , Inc.編程
MapReduce是一個編程模式,它是與處理/產生海量數據集的實現相關。用戶指定一個map函數,經過這個map函數處理key/value(鍵/值)對,而且產生一系列的中間key/value對,而且使用reduce函數來合併全部的具備相同key值的中間鍵值對中的值部分。現實生活中的不少任務的實現都是基於這個模式的,正如本文稍後會講述的那樣。api
使用這樣的函數形式實現的程序能夠自動分佈到一個由普通機器組成的超大集羣上併發執行。run-time系統會解決輸入數據的分佈細節,跨越機器集羣的程序執行調度,處理機器的失效,而且管理機器之間的通信請求。這樣的模式容許程序員能夠不須要有什麼併發處理或者分佈式系統的經驗,就能夠處理超大的分佈式系統得資源。服務器
咱們的MapReduce系統的實現運行在一個由普通機器組成的大型集羣上,而且有着很高的擴展性:一個典型的MapReduce計算處理一般分佈到上千臺機器上來處理上TB的數據。程序員會發現這樣的系統很容易使用:已經開發出來了上百個MapReduce程序,而且天天在Google的集羣上有上千個MapReduce job正在執行。網絡
在過去的5年內,Google的創造者和其餘人實現了上百個用於特別計算目的的程序來出來海量的原始數據,好比蠕蟲文檔,web請求log,等等,用於計算出不一樣的數據,好比降序索引,不一樣的圖示展現的web文檔,蠕蟲採集的每一個host的page數量摘要,給定日期內最經常使用的查詢等等。絕大部分計算都是概念上很簡潔的。不過,輸入的數據一般是很是巨大的,而且爲了能在合理時間內執行完畢,其上的計算必須分佈到上百個或者上千個計算機上去執行。如何併發計算,如何分佈數據,如何處理失敗等等相關問題合併在一塊兒就會致使本來簡單的計算掩埋在爲了解決這些問題而引入的很複雜的代碼中。數據結構
由於這種複雜度,咱們設計了一種新的東西來讓咱們可以方便處理這樣的簡單計算。這些簡單計算本來很簡單,可是因爲考慮到併發處理細節,容錯細節,以及數據分佈細節,負載均衡等等細節問題,而致使代碼很是複雜。因此咱們抽象這些公共的細節到一個lib中。這種抽象是源自Lisp以及其餘不少面向功能的語言的map和reduce概念。咱們認識到大部分操做都和map操做相關,這些map操做都是運算在輸入記錄的每一個邏輯」record」上,而且map操做爲了產生一組中間的key/value鍵值對,而且接着在全部相同key的中間結果上執行reduce操做,這樣就能夠合併適當的數據。咱們的函數模式是使用用戶定義的map和reduce操做,這樣可讓咱們併發執行大規模的運算,而且使用從新執行的方式做爲容錯的優先機制。併發
MapReduce的主要貢獻在於提供了一個簡單強大的接口,經過這個接口,能夠把大尺度的計算自動的併發和分佈執行。使用這個接口,能夠經過普通PC的巨大集羣,來達到極高的性能。
第二節講述了基本的編程模式,而且給出了一些例子。第三節講述了一個面向咱們基於集羣的計算環境的MapReduce的實現。第四節講述了一些咱們建議的精巧編程模式。第五節講述了在不一樣任務下咱們的MapReduce實現的性能比較。第六節講述了在Google中的MapReduce應用以及嘗試重寫了咱們產品的索引系統。第七節講述了相關工做和將來的工做。
咱們的運算處理一組輸入的(input)鍵值對(key/valuepairs),而且產生一組輸出的(output)鍵值對。MapReduce函數庫德用戶用兩個函數來表達這樣的計算:Map和Reduce。
Map函數,是用戶自定義的的函數,處理輸入的鍵值對,而且產生一組中間的(intermediate)鍵值對。MapReduce函數庫稽覈全部相同的中間鍵值鍵I的值,而且發送給Reduce函數進行處理。
Reduce函數一樣也是用戶提供的,它處理中間鍵值I,以及這個中間鍵值相關的值集合。這個函數合併這些值,最後造成一個相對較小的值集合。一般一個單次Reduce執行會產生0個或者1個輸出值。提供給Reduce函數的中間值是經過一個iterator來提供的。這就讓咱們能夠處理超過內存容量的值列表。
咱們考慮這樣一個例子,在很大的文檔集合中統計每個單詞出現的次數。咱們寫出相似以下的僞代碼:
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
map函數檢查每個單詞,而且對每個單詞增長1到其對應的計數器(在這個例子裏就是’1’).reduce函數把特定單詞的全部出現的次數進行合併。
此外,咱們還要寫代碼來對mapreduce specification對象進行賦值,設定輸入和輸出的文件名,以及設定一些參數。接着咱們調用MapReduce函數,把這個對象做爲參數調用過去。咱們把MapReduce函數庫(C++函數庫)和咱們的程序連接在一塊兒。附件1有完整的這個例子的代碼。
即便上邊的例子是用字符串做爲輸入和輸入出的,從概念上講,使用者提供的map和reduce函數有着以下相關類型:
map (k1,v1) à list(k2,v2)
reduce (k2,list(v2)) à list(v2)
也就是,輸入的鍵和值和輸出的鍵值是屬於不一樣的域的。進一步說,中間的鍵值是和輸出的鍵值屬於相同的域的。(好比map的輸出,就是做爲reduce的輸入)。
咱們的C++實現上,把字符串做爲用戶定義函數的輸入和輸出,由用戶代碼來本身識別字符串到合適的類型。
這裏有一些簡單有趣的例子,均可以簡單的經過MapReduce計算模型來展現:
分佈式Grep: 若是map函數檢查輸入行,知足條件的時候,map函數就把本行輸出。reduce函數就是一個直通函數,簡單的把中間數據輸出就能夠了。
URL訪問頻率統計: map函數處理webpag請求和應答(URL,1)的log。Reduce函數把全部相同的URL的值合併,而且輸出一個成對的(URL,總個數)。
逆向Web-Link 圖: map函數輸出全部包含指向target URL的source網頁,用(target,source)這樣的結構對輸出。Reduce函數局和全部關聯相同target URL的source列表,而且輸出一個(target,list(source))這樣的結構。
主機關鍵向量指標(Term-Vector per Hosts): 關鍵詞向量指標簡而言之就是在一個文檔或者一組文檔中的重點次出現的頻率,用(word,frequency)表達。map函數計算每個輸入文檔(主機名字是從文檔的URL取出的)的關鍵詞向量,而後輸出(hostname,關鍵詞向量(Term-Vector))。reduce函數處理全部相同host的全部文檔關鍵詞向量。去掉不經常使用的關鍵詞,而且輸出最終的(hostname,關鍵詞向量)對。
逆序索引: map函數分析每個文檔,而且產生一個序列(word,documentID)組。reduce函數處理指定word的全部的序列組,而且對相關的document ID進行排序,輸出一個(word,list(document ID))組。全部的輸出組,組成一個簡單的逆序索引。經過這種方法能夠很容易保持關鍵詞在文檔庫中的位置。
分佈式排序: map函數從每條記錄中抽取關鍵字,而且產生(key,record)對。reduce函數原樣輸出全部的關鍵字對。這個算法是與4.1節描述的分佈式處理相關的,而且排序是在4.2節描述的。
MapReduce接口能夠有不少種不一樣的實現。應當根據不一樣的環境選擇不一樣的實現。好比,一個實現能夠適用於小型的共享內存的機器,另外一個實現多是基於大型NUMA多處理器系統,還可能有爲大規模計算機集羣的實現。
本屆描述了Google普遍使用的計算環境:用交換機網絡[4]鏈接的,由普通PC構成的超大集羣。在咱們的環境裏:
(1) 每一個節點一般是雙x86處理器,運行Linux,每臺機器2-4GB內存。
(2) 使用的網絡設備都是經常使用的。通常在節點上使用的是100M/或者千M網絡,通常狀況下都用不到一半的網絡帶寬。
(3) 一個cluster中經常有成百上千臺機器,因此,機器故障是屢見不鮮。
(4) 存儲時使用的便宜的IDE硬盤,直接放在每個機器上。而且有一個分佈式的文件系統來管理這些分佈在各個機器上的硬盤。文件系統經過複製的方法來在不可靠的硬件上保證可用性和可靠性。
(5) 用戶向調度系統提交請求。每個請求都包含一組任務,映射到這個計算機cluster裏的一組機器上執行。
Map操做經過把輸入數據進行分區(partition)(好比分爲M塊),就能夠分佈到不一樣的機器上執行了。輸入塊的拆成多塊,能夠並行在不一樣機器上執行。Reduce操做是經過對中間產生的key的分佈來進行分佈的,中間產生的key能夠根據某種分區函數進行分佈(好比hash(key) mod R),分佈成爲R塊。分區(R)的數量和分區函數都是由用戶指定的。
圖1是咱們實現的MapReduce操做的總體數據流。當用戶程序調用MapReduce函數,就會引發以下的操做(圖一中的數字標示和下表的數字標示相同)。
1. 用戶程序中的MapReduce函數庫首先把輸入文件分紅M塊,每塊大概16M到64M(能夠經過參數決定)。接着在cluster的機器上執行處理程序。
2. 這些分排的執行程序中有一個程序比較特別,它是主控程序master。剩下的執行程序都是做爲master分排工做的worker。總共有M個map任務和R個reduce任務須要分排。master選擇空閒的worker而且分配這些map任務或者reduce任務
3. 一個分配了map任務的worker讀取並處理相關的輸入小塊。他處理輸入的數據,而且將分析出的key/value對傳遞給用戶定義的map函數。map函數產生的中間結果key/value對暫時緩衝到內存。
4. 這些緩衝到內存的中間結果將被定時刷寫到本地硬盤,這些數據經過分區函數分紅R個區。這些中間結果在本地硬盤的位置信息將被髮送回master,而後這個master負責把這些位置信息傳送給reduce的worker。
5. 當master通知reduce的worker關於中間key/value對的位置時,他調用remote procedure來從map worker的本地硬盤上讀取緩衝的中間數據。當reduce的worker讀到了全部的中間數據,他就使用中間key進行排序,這樣可使得相同key的值都在一塊兒。由於有許多不一樣key的map都對應相同的reduce任務,因此,排序是必須的。若是中間結果集太大了,那麼就須要使用外排序。
6. reduce worker根據每個惟一中間key來遍歷全部的排序後的中間數據,而且把key和相關的中間結果值集合傳遞給用戶定義的reduce函數。reduce函數的對於本reduce區塊的輸出到一個最終的輸出文件。
7. 當全部的map任務和reduce任務都已經完成了的時候,master激活用戶程序。在這時候MapReduce返回用戶程序的調用點。
當這些成功結束之後,mapreduce的執行數據存放在總計R個輸出文件中(每一個都是由reduce任務產生的,這些文件名是用戶指定的)。一般,用戶不須要合併這R個輸出文件到一個文件,他們一般把這些文件做爲輸入傳遞到另外一個MapReduce調用,或者用另外一個分佈式應用來處理這些文件,而且這些分佈式應用把這些文件當作爲輸入文件因爲分區(partition)成爲的多個塊文件。
master須要保存必定的數據結構。對於每個map和reduce任務來講,都須要保存它的狀態(idle,in-progress或者completed),而且識別不一樣的worker機器(對於非idel的任務狀態)。
master是一個由map任務產生的中間區域文件位置信息到reduce任務的一個管道。所以,對於每個完成得map任務,master保存下來這個map任務產生的R中間區域文件信息的位置和大小。對於這個位置和大小信息是當接收到map任務完成得時候作的。這些信息是增量推送處處於in-progress狀態的reduce任務的worker上的。
因爲MapReduce函數庫是設計用於在成百上千臺機器上處理海量數據的,因此這個函數庫必須考慮到機器故障的容錯處理。
Worker失效的考慮
master會按期ping每個worker機器。若是在必定時間內沒有worker機器的返回,master就認爲這個worker失效了。全部這臺worker完成的map任務都被設置成爲他們的初始idel狀態,而且所以能夠被其餘worker所調度執行。相似的,全部這個機器上正在處理的map 任務或者reduce任務都被設置成爲idle狀態,能夠被其餘worker所從新執行。
在失效機器上的已經完成的map任務還須要再次從新執行,這是由於中間結果存放在這個失效的機器上,因此致使中間結果沒法訪問。已經完成的recude任務無需再次執行,由於他們的結果已經保存在全局的文件系統中了。
當map任務首先由Aworker執行,隨後被Bworker執行的時候(由於A失效了),全部執行reduce任務的worker都會被通知。全部尚未來得及從A上讀取數據的worker都會從B上讀取數據。
MapReduce能夠有效地支持到很大尺度的worker失效的狀況。好比,在一個MapReduce操做中,在一個網絡例行維護中,可能會致使每次大約有80臺機器在幾分鐘以內不能訪問。MapReduce的master制式簡單的把這些不能訪問的worker上的工做再執行一次,而且繼續調度進程,最後完成MapReduce的操做。
Master失效
在master中,按期會設定checkpoint,寫出master的數據結構。若是master任務失效了,能夠從上次最後一個checkpoint開始啓動另外一個master進程。不過,因爲只有一個master在運行,因此他若是失效就比較麻煩,所以咱們當前的實現上,是若是master失效了,就終止MapReduce執行。客戶端能夠檢測這種失效而且若是須要就從新嘗試MapReduce操做。
失效的處理設計
當用戶提供的map和reduce函數對於他們的輸入來講是肯定性的函數,咱們的分佈式的輸出就應當和在一個整個程序沒有失敗的連續執行相同。
咱們依靠對map和reduce任務的輸出進行原子提交來完成這樣的可靠性。每個in-progress任務把輸出寫道一個私有的臨時文件中。reduce任務產生一個這樣的文件,map任務產生R個這樣的任務(每個對應一個reduce任務)。當一個map任務完成的時候,worker發送一個消息給master,而且這個消息中包含了這個R臨時文件的名字。若是master又收到一個已經完成的map任務的完成消息,他就忽略這個消息。不然,他就在master數據結構中記錄這個R文件。
當一個reduce任務完成的時候,reduce worker自動把臨時輸出的文件名改成正式的輸出文件。若是再多臺機器上有相同的reduce任務執行,那麼就會有多個針對最終輸出文件的改名動做。咱們依靠文件系統提供的原子操做’更名字’,來保證最終的文件系統狀態中記錄的是其中一個reduce任務的輸出。
咱們的絕大部分map和reduce操做都是肯定性的,實際上在語義角度,這個map和reduce併發執行和順序執行市同樣的,這就使得程序員很容易推測程序行爲。當map和reduce操做是非肯定性的時候,咱們有稍弱的可是依舊是有道理的錯誤處理機制。對於非肯定性操做來講,特定reduce任務R1的輸出,與,非肯定性的順序執行的程序對R1的輸出是等價的。另外,另外一個reduce任務R2的輸出,是和另外一個順序執行的非肯定性程序對應的R2輸出相關的。
考慮map任務M和reduce任務R1,R2。咱們設定e(Ri)爲已經提交的Ri執行(有且僅有一個這樣的執行)。當e(R1)處理得是M的一次執行,而e(R2)是處理M的另外一次執行的時候,那麼就會致使稍弱的失效處理了。
在咱們的環境下,網絡帶寬資源是相對缺少的。咱們用盡可能讓輸入數據保存在構成集羣機器的本地硬盤上(經過GFS管理[8])的方式來減小網絡帶寬的開銷。GFS把文件分紅64M一塊,而且每一塊都有幾個拷貝(一般是3個拷貝),分佈到不一樣的機器上。MapReduce的master有輸入文件組的位置信息,而且嘗試分派map任務在對應包含了相關輸入數據塊的設備上執行。若是不能分配map任務到對應其輸入數據的機器上執行,他就嘗試分配map任務到儘可能靠近這個任務的輸入數據庫的機器上執行(好比,分配到一個和包含輸入數據塊在一個switch網段的worker機器上執行)。當在一個足夠大的cluster集羣上運行大型MapReduce操做的時候,大部分輸入數據都是在本地機器讀取的,他們消耗比較少的網絡帶寬。
若是上邊咱們講的,咱們把map階段拆分到M小塊,而且reduce階段拆分到R小塊執行。在理想狀態下,M和R應當比worker機器數量要多得多。每個worker機器都經過執行大量的任務來提升動態的負載均衡能力,而且可以加快故障恢復的速度:這個失效機器上執行的大量map任務均可以分佈到全部其餘worker機器上執行。
可是咱們的實現中,實際上對於M和R的取值有必定的限制,由於master必須執行O(M+R)次調度,而且在內存中保存O(M*R)個狀態。(對影響內存使用的因素仍是比較小的:O(M*R)塊狀態,大概每對map任務/reduce任務1個字節就能夠了)
進一步來講,用戶一般會指定R的值,由於每個reduce任務最終都是一個獨立的輸出文件。在實際中,咱們傾向於調整M的值,使得每個獨立任務都是處理大約16M到64M的輸入數據(這樣,上面描寫的本地優化策略會最有效),另外,咱們使R比較小,這樣使得R佔用很少的worker機器。咱們一般會用這樣的比例來執行MapReduce: M=200,000,R=5,000,使用2,000臺worker機器。
一般狀況下,一個MapReduce的總執行時間會受到最後的幾個」拖後腿」的任務影響:在計算過程當中,會有一個機器過了比正常執行時間長得多的時間尚未執行完map或者reduce任務,致使MapReduce總任務不能按時完成。出現拖後腿的狀況有不少緣由。好比:一個機器的硬盤有點問題,常常須要反覆讀取糾錯,而後把讀取輸入數據的性能從30M/s下降到1M/s。cluster調度系統已經在某臺機器上調度了其餘的任務,因此由於CPU/內存/本地硬盤/網絡帶寬等競爭的關係,致使執行MapReduce的代碼性能比較慢。咱們最近出現的一個問題是機器的啓動代碼有問題,致使關閉了cpu的cache:在這些機器上的任務性能有上百倍的影響。
咱們有一個通用的機制來減小拖後腿的狀況。當MapReduce操做接近完成的時候,master調度備用進程來執行那些剩下的in-progress狀態的任務。不管當最初的任務仍是backup任務執行完成的時候,都把這個任務標記成爲已經完成。咱們調優了這個機制,一般只會佔用多幾個百分點的機器資源。可是咱們發現這樣作之後對於減小超大MapReduce操做的總處理時間來講很是有效。例如,在5.3節描述的排序任務,在關閉掉備用任務的狀況下,要比有備用任務的狀況下多花44%的時間。
雖然簡單寫map和reduce函數實現基本功能就已經對大部分須要都足夠了,咱們仍是開發了一些有用的擴展,這些在本節詳細描述。
MapReduce的使用者經過指定(R)來給出reduce 任務/輸出文件的數量。他們處理的數據在這些任務上經過對中間結果key得分區函數來進行分區。缺省的分區函數時使用hash函數(例如hash(key)mod R)。這通常就能夠獲得分散均勻的分區。不過,在某些狀況下,對key用其餘的函數進行分區可能更有用。好比,某些狀況下key是URL,那麼咱們但願全部對單個host的入口URL都保存在相同的輸出文件。爲了支持相似的狀況,MapReduce函數庫可讓用戶提供一個特定的分區函數。好比使用hash(hostname(urlkey))mod R做爲分區函數,這樣可讓指向同一個hostname的URL分配到相同的輸出文件中。
咱們確保在給定的分區中,中間鍵值對key/value的處理順序是根據key增量處理的。這樣的順序保證能夠很容易生成每個分區有序的輸出文件,這對於輸出文件格式須要支持客戶端的對key的隨機存取的時候就頗有用,或者對輸出數據集再做排序就很容易。
在某些狀況下,容許中間結果key重複會佔據至關的比重,而且用戶定義的reduce函數知足結合律和交換律。好比2.1節的一個統計單詞出現次數的例子。因爲word的頻率趨勢符合Zipf 分佈(齊夫分佈),每個map任務都回產生成百上千的<the,1>這樣格式的記錄。全部這些記錄都經過網絡發送給一個單個的reduce任務,經過reduce函數進行相加,最後產生單個數字。咱們容許用戶指定一個可選的組合函數Combiner函數,先在本地進行合併如下,而後再經過網絡發送。
Combiner函數在每個map任務的機器上執行。一般這個combiner函數的代碼和reduce的代碼實現上都是同樣的。reduce函數和combiner函數惟一的不一樣就是MapReduce對於這兩個函數的輸出處理上不一樣。對於reduce函數的輸出是直接寫到最終的輸出文件。對於combiner函數來講,輸出是寫到中間文件,而且會被髮送到reduce任務中去。
部分使用combiner函數能夠顯著提升某些類型的MapReduce操做。附錄A有這樣的使用combiner的例子。
MapReduce函數庫提供了讀取幾種不一樣格式的輸入的支持。例如,」text」模式下,每行輸入都被當作一個key/value對:key是在文件的偏移量,value是行的內容。另外一個寵用格式保存了根據key進行排序key/value對的順序。每個輸入類型的實現都知道如何把輸入爲了分別得map任務而進行有效分隔(好比,text模式下的分隔就是要確保分隔的邊界只能按照行來進行分隔)。用戶能夠經過簡單的提供reader接口來進行新的輸入類型的支持。不過大部分用戶都只用一小部分預先定義的輸入類型。
reader函數不須要提供從文件讀取數據。例如,咱們很容易定義一個reader函數從數據庫讀取數據,或者從保存在內存中的數據結構中讀取數據。
相似的,咱們提供了一組用於輸出的類型,能夠產生不一樣格式的數據,而且用戶也能夠很簡單的增長新的輸出類型。
在某些狀況下,MapReduce的使用上,若是再map操做或者reduce操做時,增長輔助的輸出文件,會比較有用。咱們依靠程序來提供這樣的邊界原子操做。一般應用程序寫一個臨時文件而且用系統的原子操做:更名字操做,來再這個文件寫完的時候,一次把這個文件更名改掉。
對於單個任務產生的多個輸出文件來講,咱們沒有提供其上的兩階段提交的原子操做支持。所以,對於產生多個輸出文件的,對於跨文件有一致性要求的任務,都必須是肯定性的任務。這個限制到如今爲止尚未真正在實際中遇到過。
某些狀況下,用戶程序的代碼會讓map或者reduce函數在處理某些記錄的時候crash掉。這種狀況下MapReduce操做就不能完成。通常的作法是改掉bug而後再執行,可是有時候這種先改掉bug的方式不太可行;也許是由於bug是在第三方的lib裏邊,它的原代碼不存在等等。而且,不少時候,忽略一些記錄不處理也是能夠接受的,好比,在一個大數據集上進行統計分析的時候,就能夠忽略有問題的少許記錄。咱們提供了一種執行模式,在這種執行模式下,MapReduce會檢測到哪些記錄會致使肯定的crash,而且跳過這些記錄不處理,使得整個處理能繼續進行。
每個worker處理進程都有一個signal handler,能夠捕獲內存段異常和總線錯誤。在執行用戶map或者reduce操做以前,MapReduce函數庫經過全局變量保存記錄序號。若是用戶代碼產生了這個信號,signal handler因而用」最後一口氣」經過UDP包向master發送上次處理的最後一條記錄的序號。當master看到在這個特定記錄上,有不止一個失效的時候,他就標誌着條記錄須要被跳過,,而且在下次從新執行相關的Map或者Reduce任務的時候跳過這條記錄。
由於實際執行操做時分佈在系統中執行的,一般是在好幾千臺計算機上執行得,而且是由master機器進行動態調度的任務,因此對map和reduce函數的調試就比較麻煩。爲了可以讓調試方便,profiling和小規模測試,咱們開發了一套MapReduce的本地實現,也就是說,MapReduce函數庫在本地機器上順序執行全部的MapReduce操做。用戶能夠控制執行,這樣計算能夠限制到特定的map任務上。用戶能夠經過設定特別的標誌來執行他們的程序,同時也能夠很容易的使用調試和測試工具(好比gdb)等等。
master內部有一個HTTP服務器,而且能夠輸出狀態報告。狀態頁提供了計算的進度報告,好比有多少任務已經完成,有多少任務正在處理,輸入的字節數,中間數據的字節數,輸出的字節數,處理百分比,等等。這些頁面也包括了指向每一個任務輸出的標準錯誤和輸出的標準文件的鏈接。用戶能夠根據這些數據來預測計算須要大約執行多長時間,是否須要爲這個計算增長額外的計算資源。這些頁面也能夠用來分析爲什麼計算執行的會比預期的慢。
此外,最上層的狀態頁面也顯示了哪些worker失效了,以及他們失效的時候上面運行的map和reduce任務。這些信息對於調試用戶代碼中的bug頗有幫助。
MapReduce函數庫提供了用於統計不一樣事件發生次數的計數器。好比,用戶可能想統計全部已經索引的German文檔數量或者已經處理了多少單詞的數量,等等。
爲了使用這樣的特性,用戶代碼建立一個叫作counter的對象,而且在map和reduce函數中在適當的時候增長counter的值。例如:
Counter* uppercase;
uppercase = GetCounter("uppercase");
map(String name, String contents):
for each word w in contents:
if (IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w, "1");
這些counter的值,會定時從各個單獨的worker機器上傳遞給master(經過ping的應答包傳遞)。master把執行成功的map或者reduce任務的counter值進行累計,而且當MapReduce操做完成以後,返回給用戶代碼。當前counter值也會顯示在master的狀態頁面,這樣人能夠看到計算現場的進度。當累計counter的值的時候,master會檢查是否有對同一個map或者reduce任務的相同累計,避免累計重複。(backup任務或者機器失效致使的從新執行map任務或者reduce任務或致使這個counter重複執行,因此須要檢查,避免master進行重複統計)。
部分計數器的值是由MapReduce函數庫進行自動維持的,好比已經處理的輸入的key/value對的數量,或者輸出的key/value鍵值對等等。
counter特性對於MapReduce操做的完整性檢查很是有用。好比,在某些MapReduce操做中,用戶程序須要確保輸出的鍵值對精確的等於處理的輸入鍵值對,或者處理得German文檔數量是在處理的整個文檔數量中屬於合理範圍內。
在本節,咱們用在一個大型集羣上運行的兩個計算來衡量MapReduce的性能。一個計算用來在一個大概1TB的數據中查找特定的匹配串。另外一個計算排序大概1TB的數據。
這兩個程序表明了大量的用MapReduce實現的真實的程序的主要類型-一類是對數據進行洗牌,另外一類是從海量數據集中抽取少部分的關心的數據。
全部這些程序都是運行在一個大約有1800臺機器的集羣上。每臺機器配置2個2G Intel Xeon支持超線程的處理器, 4GB內存,兩個160GBIDE硬盤,一個千兆網卡。這些機器部署在一個由兩層的,樹形交換網絡中,在最上層大概有100-200G的聚合貸款。全部這些機器都有相同的部署(對等部署),所以任意兩點之間的來回時間小於1毫秒。
在4GB內存裏,大概有1-1.5G用於運行在集羣上的其餘任務。這個程序是在週末下午執行的,這時候的CPU,磁盤和網絡基本上屬於空閒狀態。
grep程序須要掃描大概10的10次方個由100個字節組成的記錄,查找比較少見的3個字符的查找串(這個查找串在92,337個記錄中存在)。輸入的記錄被拆分紅大約64M一個的塊(M=15000),整個輸出方在一個文件中(R=1)。
圖2表示了這個程序隨時間的處理過程。Y軸是輸入數據的處理速度。處理速度逐漸隨着參與MapReduce計算的機器增長而增長,當1764臺worker開始工做的時候,達到了30G/s的速度。當map任務結束的時候,在計算開始後80秒,輸入的速度降到0。整個計算過程從開始到結束一共花了大概150秒。這包括了大約一分鐘的開頭啓動部分。開頭的部分是用來把這個程序傳播到各個worker機器上的時間,而且等待GFS系統打開100個輸入文件集合而且得到相關的文件位置優化信息。
SORT程序排序10的10次方個100個字節組成的記錄(大概1TB的數據)。這個程序是仿製TeraSort benchmark[10]的。
sort程序是由不到50行用戶代碼組成。三行的map函數從文本行中解出10個字節的排序key,而且把這個key和原始行做爲中間結果key/value鍵值對輸出。咱們使用了一個內嵌的identitiy函數做爲reduce的操做。這個函數把中間結果key/value鍵值對不變的做爲輸出的key/value鍵值對。最終排序輸出寫到一個兩路複製的GFS文件中(就是說,程序的輸出會寫2TB的數據)。
就像前邊講的,輸入數據分紅64MB每塊(M=15000)。咱們把排序後的輸出分區成爲4000個文件(R=4000)。分區函數使用key的原始字節來吧數據分區到R個小塊中。
咱們這個benchmark中的分區函數自身知道key的分區狀況。一般對於排序程序來講,咱們會增長一個預處理的MapReduce操做,這個操做用於採樣key的狀況,而且用這個採樣的key的分佈狀況來計算對最終排序處理得分區點。
圖三是這個排序程序的正常執行過程。左上的圖表示了輸入數據讀取的速度。數據讀取速度會達到13G/s,而且在不到200秒全部map任務完成以後迅速滑落到0。咱們注意到數據讀取速度小於grep粒子。這是由於排序map任務劃了大概一半時間和I/O帶寬寫入中間輸出到本地硬盤。相對應的grep中間結果輸出幾乎能夠忽略不計。
左邊中間的圖是map任務把中間數據發送到reduce任務的網絡速度。這個排序過程自從第一個任務完成以後就開始了。圖示上的第一個高峯是啓動了第一批大概1700個reduce任務(整個MapReduce分佈到大概1700臺機器上,每臺機器一次大概執行1個reduce任務)。大概計算開始300秒之後,這些第一批reduce任務完成了,而且咱們開始執行剩下的reduce任務。全部這些排序任務會在計算開始後大概600秒結束。
左下的圖表示reduce任務把排序後的數據寫到最終的輸出文件的速度。在第一個排序期結束後到寫盤開始以前有一個小延時,這是由於機器正在忙於內部排序中間數據。寫盤速度持續大概2-4G/s。在計算開始後大概850秒左右寫盤完成。包括啓動部分,整個計算用了891秒。這個和TeraSort benchmark[18]的最高紀錄1057秒差很少。
須要注意的事情是:輸入速度要比排序速度和輸出速度快,這是由於咱們本地化的優化策略,絕大部分數據都是從本地硬盤讀取而上去了咱們相關的網絡消耗。排序速度比輸出速度快,這是由於輸出階段寫了兩份排序後的速度(咱們寫兩份的緣由是爲了可靠性可可用性的緣由)。咱們寫兩份的緣由是由於底層文件系統的可靠性和可用性的要求。若是底層文件系統用相似容錯編碼[14](erasure coding)的方式,而不採用複製寫的方式,在寫盤階段能夠下降網絡帶寬的要求。
在圖三(b),是咱們在關閉掉backup任務的時候,sort程序的執行狀況。執行流和上邊講述的圖3(a)很相似,可是這個關閉掉backup任務的時候,執行的尾巴很長,而且執行的尾巴沒有什麼有效的寫盤動做。在960秒之後,除了5個reduce之外,其餘reduce任務都已經完成。不過這些拖後腿的任務又執行了300秒才完成。整個計算化了1283秒,多了44%的執行時間。
在圖三(c)中,咱們演示了在sort程序執行過程當中故意暫時殺掉1746個worker中的200個worker進程的執行狀況。底層的集羣調度馬上在這些機器上從新建立了新的worker處理(由於咱們只是把這些機器上的處理進程殺掉,而機器依舊是能夠操做的)。
由於已經完成的map work丟失了(因爲相關的map worker被殺掉了),須要從新再做,因此worker死掉會致使一個負數的輸入速率。相關map任務的從新執行很快就從新執行了。整個計算過程在933秒內完成,包括了前邊的啓動時間(只比正常執行時間多了5%的時間)。
咱們在2003年1月寫了第一個版本的MapReduce函數庫,而且在2003年8月做了顯著的加強,包括了本地優化,worker機器之間的動態負載均衡等等。自那之後,MapReduce函數庫就普遍用於咱們平常處理的問題。它如今在Google內部各個領域內普遍應用,包括:
。大尺度的計算機學習問題。
。Google News和Froogle產品的集羣問題。
。從公衆查詢產品(好比Google的Zeitgeist)的報告中抽取數據。
。從web網頁做新試驗和抽取新的產品(例如,從大量的webpage中的本地查找抽取物理位置信息)。
。大尺度的圖型計算。
任務數 平均任務完成時間 使用的機器時間 |
29423 634秒 79,186天 |
讀取的輸入數據 產生的中間數據 寫出的輸出數據 |
3,288TB 758TB 193TB |
每一個job平均worker機器數 每一個job平均死掉work數 每一個job平均map任務 每一個job平均reduce任務 |
157 1.2 3,351 55 |
map惟一實現 reduce的惟一實現 map/reduce的combiner實現 |
395 296 426 |
表1:MapReduce2004年8月的執行狀況
圖四顯示了咱們的源代碼管理系統中,隨着時間推移,MapReduce程序的顯著增長,從2003年早先時候的0個增加到2004年9月份的差很少900個不一樣的程序。MapReduce之因此這樣成功是由於他可以在不到半小時時間內寫出一個簡單的可以應用於上千臺機器的大規模併發程序,而且極大的提升了開發和原形設計的週期效率。而且,他可讓一個徹底沒有分佈式和/或並行系統經驗的程序員,可以很容易的開發處理海量數據的程序。
在每個任務結束的時候,MapReduce函數庫記錄使用的計算資源的狀態。在表1,咱們列出了2004年8月份MapReduce運行的任務所佔用的相關資源。
到目前爲止,最成功的MapReduce的應用就是重寫了Google web 搜索服務所使用到的index系統。索引系統處理蠕蟲系統抓回來的超大量的數據,這些數據保存在GFS文件裏。普通這些文檔的大小是超過了20TB的數據。索引程序是經過一系列的,大概5到10次MapReduce操做來創建索引。經過利用MapReduce(替換掉上一個版本的特別設計的分佈處理的索引程序版本)有這樣一些好處:
l 索引代碼很簡單,很小,很容易理解。由於對於容錯的處理代碼,分佈以及並行處理代碼都經過MapReduce函數庫封裝了,因此索引代碼很簡單,很小,很容易理解。例如,當使用MapReduce函數庫的時候,計算的代碼行數從原來的3800行C++代碼一下減小到大概700行代碼。
l MapReduce的函數庫的性能已經很是好,因此咱們能夠把概念上不相關的計算步驟分開處理,而不是混在一塊兒以期減小處理次數。這使得咱們容易改變索引處理方式。好比,咱們對老索引系統的一個小更改可能要好幾個月的時間,可是在新系統內,只須要花幾天時間就能夠了。
l 索引系統的操做更容易了,這是由於機器的失效,速度慢的機器,以及網絡風暴都已經由MapReduce本身解決了,而不須要操做人員的交互。此外,咱們能夠簡單的經過對索引系統增長機器的方式提升處理性能。
不少系統都提供了嚴格的編程模式,而且經過對編程的嚴格限制來實現自動的並行計算。例如,一個結合函數能夠在一個N個元素的全部前綴上進行計算,而且使用併發前綴計算,會在在N個併發節點上會耗費log N的時間[6,9,13]。MapReduce是這些模式下的,一個咱們基於超大系統的現實經驗的一個簡化和精煉。而且,咱們還提供了基於上千臺處理器的容錯實現。而大部分併發處理系統都只在小規模的尺度上實現,而且機器的容錯仍是程序員來操心的。
Bulk Synchronous Programming[17]以及一些MPI primitives[11]提供了更高級別的抽象,能夠更容易寫出並行處理的程序。這些系統和MapReduce系統的不一樣之處在於,MapReduce是經過限制性編程模式自動實現用戶程序的併發處理,而且提供了透明的容錯處理。
咱們本地的優化策略是受active disks[12,15]等技術的影響的,在active disks中,計算任務是儘可能推送到數據在本地磁盤的節點處理,這樣就減小了網絡系統的I/O吞吐。咱們是在直接附帶幾個硬盤的通機器上執行咱們的計算工做,不是在磁盤處理器上執行咱們的工做,可是總的效果是同樣的。
咱們的backup task機制和早先CharlotteSystem[3]的機制比較相似。早先的簡單調度的一個缺點是若是一個任務致使反覆失效,那麼整個計算就不能完成。咱們經過在故障狀況下跳過故障記錄的方式,在某種程度上解決了這個問題。
MapReduce的實現依賴於一個內部的集羣管理系統,這個集羣管理系統負責在一個超大共享機器組上分佈和運行用戶任務。雖然這個不是本論文的重點,集羣管理系統在理念上和Condor[16]等其餘系統同樣。
MapReduce函數庫的排序部分和NOW-Sort[1]的操做上很相似。源機器(map workers)把待排序的數據進行分區,而且發送到R個reduce worker中的一個進行處理。每個reduce worker做本地排序(儘量在內存排序)。固然NOW-Sort沒有刻意用戶定義的Map和Reduce函數,而咱們的函數庫有,因此咱們的函數庫能夠有很高的適應性。
River[2]提供了一個編程模式,在這樣的編程模式下,處理進程能夠經過分佈式查詢來互相傳送數據的方式進行通信。和MapReduce相似,River系統嘗試提供對不一樣應用有近似平均的性能,即便在不對等的硬件環境下或者在系統顛簸的狀況下也能提供近似平均的性能。River是經過精心調度硬盤和網絡的通信,來平衡任務的完成時間。MapReduce的框架是經過限制性編程模式,來把問題分解成爲大量的任務。每個任務都是動態調度到可用的worker上執行,這樣快速的worker能夠執行更多的任務。限制性編程模式一樣容許咱們在接近計算完成的時候調度backup 任務,在出現處理不均勻的狀況下,大量的縮小整個完成的時間(好比在有慢機或者阻塞的worker的時候)。
BAD-FS[5]和MapReduce的編程模式徹底不一樣,它不像MapReduce是基於很大的網絡計算的。不過,這兩個系統有兩個基本原理很相似。(1)兩個系統都使用重複執行來防止因爲失效致使的數據丟失。(2)兩個都使用數據本地化調度策略,使得處理儘量在本地數據上進行,減小經過網絡通信的數據量。
TACC[7]是一個用於簡單構造高可用性網絡服務的系統。就像MapReduce,它依靠從新執行機制來實現的容錯處理。
MapReduce的編程模式在Google成功應用於許多方面。咱們把這種成功應用歸結爲幾個方面:首先,這個編程模式易於使用,即便程序員沒有並行或者分佈式系統經驗,因爲MapReduce封裝了並行的細節和容錯處理,本地化計算,負載均衡等等,因此,使得編程很是容易。其次,大量不一樣的問題均可以簡單經過MapReduce來解決。例如,MapReduce用於產生Google的web搜索服務所須要的數據,用來排序,用來數據挖掘,用於機器智能學習,以及不少其餘系統。第三,咱們已經在一個好幾千臺計算機的大型集羣上開發實現了這個MapReduce。這個實現使得對於這些機器資源的利用很是簡單,而且所以也適用於解決Google遇到的其餘不少須要大量計算的問題。
咱們也從MapReduce上學到了很多內容。首先,先執行編程模式使得並行和分佈式計算很是容易,而且也易於構造這樣的容錯計算環境。其次,網絡帶寬是系統的資源的瓶頸。咱們系統的一系列優化都使所以針對減小網絡傳輸量爲目的的:本地優化使得咱們讀取數據時,是從本地磁盤讀取的,而且寫出單箇中間數據文件到本地磁盤也節約了網絡帶寬。第三,冗餘執行能夠減小慢機器帶來的影響,而且解決因爲機器失效致使的數據丟失問題。
Josh Levenberg校定和擴展了用戶級別的MapReduce API,而且結合他的適用經驗和其餘人的改進建議,增長了不少新的功能。MapReduce使用Google文件系統GFS[8]來做爲數據和輸出。咱們還感謝Percy Liang Olcan Sercinoglu 在開發用於MapReduce的集羣管理系統得工做。Mike Burrows,Wilson Hsieh,Josh Levenberg,Sharon Perl,RobPike,Debby Wallach 爲本論文提出了寶貴的意見。OSDI的無名審閱者,以及咱們的審覈者Eric Brewer,在論文應當如何改進方面給出了有益的意見。最後,咱們感謝Google的工程部的全部MapReduce的用戶,感謝他們提供了有用的反饋,以及建議,以及錯誤報告等等。
[1] Andrea C. Arpaci-Dusseau, Remzi H. Arpaci-Dusseau,David E. Culler, Joseph M. Hellerstein, and David A. Patterson.High-performance sorting on networks of workstations.In Proceedings of the 1997 ACM SIGMOD InternationalConference on Management of Data, Tucson,Arizona, May 1997.
[2] Remzi H. Arpaci-Dusseau, Eric Anderson, NoahTreuhaft, David E. Culler, Joseph M. Hellerstein, David Patterson, and Kathy Yelick. Cluster I/O with River:Making the fast case common. In Proceedings of the Sixth Workshop on Input/Output in Parallel and Distributed Systems (IOPADS '99), pages 10.22, Atlanta, Georgia, May 1999.
[3] Arash Baratloo, Mehmet Karaul, Zvi Kedem, and Peter Wyckoff. Charlotte: Metacomputing on the web. In Proceedings of the 9th International Conference on Parallel and Distributed Computing Systems, 1996. [4] Luiz A. Barroso, Jeffrey Dean, and Urs H¨olzle. Web search for a planet: The Google cluster architecture. IEEE Micro, 23(2):22.28, April 2003.
[5] John Bent, Douglas Thain, Andrea C.Arpaci-Dusseau, Remzi H. Arpaci-Dusseau, and Miron Livny. Explicit control in a batch-aware distributed file system. In Proceedings of the 1st USENIX Symposium on Networked Systems Design and Implementation NSDI, March 2004.
[6] Guy E. Blelloch. Scans as primitive parallel operations.IEEE Transactions on Computers, C-38(11), November 1989.
[7] Armando Fox, Steven D. Gribble, Yatin Chawathe, Eric A. Brewer, and Paul Gauthier. Cluster-based scalable network services. In Proceedings of the 16th ACM Symposium on Operating System Principles, pages 78. 91, Saint-Malo, France, 1997.
[8] Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung. The Google file system. In 19th Symposium on Operating Systems Principles, pages 29.43, Lake George, New York, 2003. To appear in OSDI 2004 12
[9] S. Gorlatch. Systematic efficient parallelization of scan and other list homomorphisms. In L. Bouge, P. Fraigniaud, A. Mignotte, and Y. Robert, editors, Euro-Par'96. Parallel Processing, Lecture Notes in Computer Science 1124, pages 401.408. Springer-Verlag, 1996.
[10] Jim Gray. Sort benchmark home page. http://research.microsoft.com/barc/SortBenchmark/.
[11] William Gropp, Ewing Lusk, and Anthony Skjellum. Using MPI: Portable Parallel Programming with the Message-Passing Interface. MIT Press, Cambridge, MA, 1999.
[12] L. Huston, R. Sukthankar, R.Wickremesinghe, M. Satyanarayanan, G. R. Ganger, E. Riedel, and A. Ailamaki. Diamond: A storage architecture for early discard in interactive search. In Proceedings of the 2004 USENIX File and Storage Technologies FAST Conference, April 2004.
[13] Richard E. Ladner and Michael J. Fischer. Parallel prefix computation. Journal of the ACM, 27(4):831.838, 1980. [14] Michael O. Rabin. Efficient dispersal of information for security, load balancing and fault tolerance. Journal of the ACM, 36(2):335.348, 1989.
[15] Erik Riedel, Christos Faloutsos, Garth A. Gibson, and David Nagle. Active disks for large-scale data processing. IEEE Computer, pages 68.74, June 2001.
[16] Douglas Thain, Todd Tannenbaum, and Miron Livny. Distributed computing in practice: The Condor experience. Concurrency and Computation: Practice and Experience, 2004.
[17] L. G. Valiant. A bridging model for parallel computation. Communications of the ACM, 33(8):103.111, 1997.
[18] Jim Wyllie. Spsort: How to sort a terabyte quickly. http://alme1.almaden.ibm.com/cs/spsort.pdf.
本節包含了一個完整的程序,用於統計在一組命令行指定的輸入文件中,每個不一樣的單詞出現頻率。
#include "mapreduce/mapreduce.h"
// User's map function
class WordCounter : public Mapper {
public:
virtual void Map(const MapInput& input) {
const string& text = input.value();
const int n = text.size();
for (int i = 0; i < n; ) {
// Skip past leading whitespace
while ((i < n) && isspace(text[i]))
i++;
// Find word end
int start = i;
while ((i < n) && !isspace(text[i]))
i++;
if (start < i)
Emit(text.substr(start,i-start),"1");
}
}
};
REGISTER_MAPPER(WordCounter);
// User's reduce function
class Adder : public Reducer {
virtual void Reduce(ReduceInput* input) {
// Iterate over all entries with the
// same key and add the values
int64 value = 0;
while (!input->done()) {
value += StringToInt(input->value());
input->NextValue();
}
// Emit sum for input->key()
Emit(IntToString(value));
}
};
REGISTER_REDUCER(Adder);
int main(int argc, char** argv) {
ParseCommandLineFlags(argc, argv);
MapReduceSpecification spec;
// Store list of input files into "spec"
for (int i = 1; i < argc; i++) {
MapReduceInput* input = spec.add_input();
input->set_format("text");
input->set_filepattern(argv[i]);
input->set_mapper_class("WordCounter");
}
// Specify the output files:
// /gfs/test/freq-00000-of-00100
// /gfs/test/freq-00001-of-00100
// ...
MapReduceOutput* out = spec.output();
out->set_filebase("/gfs/test/freq");
out->set_num_tasks(100);
out->set_format("text");
out->set_reducer_class("Adder");
// Optional: do partial sums within map
// tasks to save network bandwidth
out->set_combiner_class("Adder");
// Tuning parameters: use at most 2000
// machines and 100 MB of memory per task
spec.set_machines(2000);
spec.set_map_megabytes(100);
spec.set_reduce_megabytes(100);
// Now run it
MapReduceResult result;
if (!MapReduce(spec, &result)) abort();
// Done: 'result' structure contains info
// about counters, time taken, number of
// machines used, etc.
return 0;
}
崮山路上走9遍2005-8-8於大連完稿
BLOG: sharp838.mblogger.cn
EMAIL: sharp838@21cn.com;guangweishi@gmail.com
全部的版權歸於原做者。
感謝:朱朱,洋洋,sophia