Abstract程序員
MapReduce是一種編程模型和一種用來處理和產生大數據集的相關實現。用戶定義map函數來處理key/value鍵值對來產生一系列的中間的key/value鍵值對。還要定義一個reduce函數用來合併有着相同中間key值的中間value。許多現實世界中的任務均可以用這種模型來表達,就像下文所展現的那樣。web
用這個風格編寫的程序能夠自動並行地在集羣上工做。運行時系統會自動處理例如切割輸入數據,在機器之間調度程序的執行,處理機器故障以及管理必要的機器間通訊等細節問題。這可讓那些對於並行分佈式系統沒有任何經驗的程序員也能很簡單地利用起一個大的分佈式系統的資源。算法
咱們的MapReduce的實現運行在一個由大的商業機構成的集羣當中而且是高度可擴展的:一個典型的MapReduce計算要在上千臺機器中處理TB數量級的數據。程序員會以爲這個系統很是好用:已經有成千上萬的MapReduce程序被實現出來而且天天有上千個MapReduce任務運行在Google的集羣上。數據庫
1 Introduction編程
在過去五年中,做者和許多Google的其餘人已經實現了成百上千個用於特殊目的的計算程序用於處理大量的raw data,各類各樣的derived data。許多這種計算程序在概念上都是很是直接的。然而輸入的數據量每每很大,而且計算須要分佈在成百上千臺機器中爲了在一個可接受的時間內完成任務。可是除了簡單的計算模型之外,咱們須要大量複雜的代碼用來處理例如如何並行化計算、分發數據、處理故障等等問題。api
爲了解決這樣的複雜性,咱們設計了一種新的抽象,它讓咱們只須要表示出咱們想要執行的計算模型,而將背後複雜的並行化,容錯,數據分發,負載平衡等等技術的實現細節隱藏在了庫中。咱們這種新的抽象是受Lisp以及其餘一些函數式編程語言中的map和reduce原語影響而來的。咱們意識到許多的計算都須要對於輸入中的每一個邏輯「記錄」進行map操做,爲了計算一系列的中間鍵值對。而後還須要對全部共享同一個key的value進行reduce操做,從而可以對派生的數據進行適當的組合。咱們這種讓用戶自定義map和reduce操做的編程模型可以讓咱們簡單地對大量數據實現並行化,而且使用從新執行做爲主要的容錯機制。緩存
咱們這項工做的主要共享是提供了一個簡單而且強大的接口可以讓咱們實現自動的並行化而且分佈處理大規模的計算,同時該接口的實現能在大型的商用PC集羣上得到很是高的性能。網絡
Section 2描述了基本的編程模型以及一些簡單的例子。Section 3描述了爲咱們的基於集羣的計算環境量身定作的MapReduce接口。Section 4描述了一些咱們認爲有用的對於編程模型的改進。Section 5是對咱們的實如今不一樣任務下的性能測試。Section 6 包含了MapReduce在Google內的使用狀況,包括咱們以它爲基礎重寫咱們的產品索引系統的經驗。Section 7討論了相關的工做以及將來的發展。數據結構
2 Programming Model負載均衡
計算模型以一系列的鍵值對做爲輸入併產生一系列的鍵值對做爲輸出。MapReduce庫的用戶以「Map」和"Reduce"兩個函數來表達計算。
Map,是由用戶編寫的,獲取一個輸入對,而且產生一系列中間的鍵值對。MapReduce庫將那些具備相同的中間鍵I的中間值彙集在一塊兒,而後將它們傳遞給Reduce函數。
Reduce函數一樣是由用戶編寫的,接收一箇中間鍵I和該鍵對應的一系列的中間值。Reduce函數經過將這些值合併來組成一個更小的值的集合。一般每一個Reduce函數只產生0個或1個輸出值。Reduce函數通常經過一個迭代器來獲取中間值,從而在中間值的數目遠遠大於內存容量時,咱們也可以處理。
2.1 Example
下面來考慮這樣一個問題:統計大量文檔中每個單詞出現的次數。對此,用戶須要編寫相似於以下的僞代碼:
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
Map函數爲在每個單詞出現的時候,爲它加上一個計數(在這個簡單的例子中就是加1)。Reduce函數對每一個單詞的全部計數進行疊加。
另外,用戶須要用輸入輸出文件的名字,以及一個可選的tuning parameter去填充一個叫mapreduce specification 的對象。以後,用戶調用MapReduce函數,將定義的上述對象傳遞進去。用戶的代碼將和MapReduce庫相連(由C++實現)。Appendix A中有這個例子全部的代碼文檔。
2.2 Types
雖然在上述的僞代碼中輸入輸出都是字符串類型的,但事實上,用戶提供的Map和Reduce函數都是有相應類型的:
map (k1, v1) -> list(k2, v2)
reduce (k2, list(v2)) -> list(v2)
須要注意的是,輸入的key和value與輸出的key和value是不一樣的類型,而中間的key和value與輸出的key和value是相同的類型。咱們的C++實現都是以字符串的形式和用戶代碼進行交互的,至於將字符串類型轉換成相應合適的類型的工做則由用戶代碼來完成了。
2.3 More Example
接下來是一些可以簡單地用MapReduce計算模型進行表達的例子
Distributed Grep:Map函數獲取匹配提供的模式的行,Reduce函數只是簡單地將這些中間數據拷貝到輸出
Count of URL Access Frequency:Map函數處理web請求的日誌,而且輸出<URL, 1>。Reduce函數將擁有相同URL的value相加,獲得<URL, total count>對
Reverse Web-Link Graph:Map函數輸出<target, source>對,其中source所在的page都有連向target這個URL的連接。Reduce函數將給定target的全部的source URL鏈接起來,輸出<target, list(source)>對
Term-Vector per Host:一個term vector表示一系列<word, frequency>的鍵值對,word表示一篇或者一系列文章中出現的比較重要的單詞,frequency表示它們出現的次數。Map函數對於每篇輸入的文章輸出<hostname, term vector>鍵值對(其中hostname是從文章所在的URL中抽取出來的)Reduce函數獲取給定host的term vectors。它將這些term vectors累加起來,丟棄非頻繁出現的term,併產生一個最終的<hostname, term vector>對。
Inverted Index:Map函數對每篇文章進行處理,並輸出一系列的<word, document ID>對。Reduce函數接收給定word的全部鍵值對,對相應的document ID進行排序而且輸出<word, list<document ID>>對。全部輸出對的集合構成了一個簡單的倒排索引。用了MapReduce模型,對單詞位置的追蹤就變得很是簡單了。
Distributed Sort:Map函數從每一個record中抽取出key,產生<key, record>鍵值對。Reduce函數只是簡單地將全部對輸出。這個計算模型依賴於Section 4.1中描述的劃分技巧以及Section 4.2中描述的排序特性。
3 Implementation
對於MapReduce的接口,各類各樣不一樣的實現都是可能的。全部正確的實現都是基於應用環境的。好比,一種實現可能適合於小的共享內存的機器,另外一種可能適合於大型的NUMA多處理器機器,甚至有的是爲更大的互聯的機器集羣設計的。
本節中描述的實現基於的是Google中最經常使用的計算環境:一個由大量商用PC機經過交換以太網互聯的集羣。在咱們的環境中:
(1)、機器一般都是x86的雙核處理器,其上運行Linux,每臺機器擁有2-4G的內存
(2)、商用網絡硬件---一般是100 M/s或者1 G/s,可是綜合起來要小於平均帶寬
(3)、一個集羣由成千上萬臺機器組成,所以機器故障是常有的事
(4)、存儲由便宜的IDE磁盤提供,它們都與獨立的機器直接相連。一個內部研發的文件系統用於管理全部存儲於這些硬盤上的文件。該文件系統經過Replication在不可靠的硬件上提供了可用性和可靠性
(5)、用戶提交jobs給調度系統。每一個job由一系列的task組成,而且由調度器分配到集羣中一系列可用的機器上
3.1 Execution Overview
經過將輸入數據自動分割成M份,Map函數得以在多臺機器上分佈式執行。每個輸入塊都能並行地在不一樣的機器上執行。經過劃分函數(例如,hash(key) mod R)將中間鍵劃分爲R份,Reduce函數也能被分佈式地調用。其中劃分的數目R和劃分函數都是由用戶指定的。
上圖1展現了在咱們的實現中MapReduce所有的流程。當用戶程序調用MapReduce函數時,接下來的動做將按序發生(圖1中標記的數字與下面的數字是一一對應的):
(1)、用戶程序中的MapReduce庫首先將輸入文件劃分爲M片,每片大小通常在16M到64M之間(由用戶經過一個可選的參數指定)。以後,它在集羣的不少臺機器上都啓動了相同的程序拷貝。
(2)其中有一個拷貝程序是特別的----master。剩下的都是worker,它們接收master分配的任務。其中有M個Map任務和R個Reduce任務要分配。master挑選一個空閒的worker而且給它分配一個map任務或者reduce任務。
(3)、被分配到Map任務的worker會去讀取相應的輸入塊的內容。它從輸入文件中解析出鍵值對而且將每一個鍵值對傳送給用戶定義的Map函數。而由Map函數產生的中間鍵值對緩存在內存中。
(4)、被緩存的鍵值對會階段性地寫回本地磁盤,而且被劃分函數分割成R份。這些緩存對在磁盤上的位置會被回傳給master,master再負責將這些位置轉發給Reduce worker。
(5)、當Reduce worker從master那裏接收到這些位置信息時,它會使用遠程過程調用從Map worker的本地磁盤中獲取緩存的數據。當Reduce worker讀入所有的中間數據以後,它會根據中間鍵對它們進行排序,這樣全部具備相同鍵的鍵值對就都彙集在一塊兒了。排序是必須的,由於會有許多不一樣的鍵被映射到同一個reduce task中。若是中間數據的數量太大,以致於不可以裝入內存的話,還須要另外的排序。
(6)、Reduce worker遍歷已經排完序的中間數據。每當遇到一個新的中間鍵,它會將key和相應的中間值傳遞給用戶定義的Reduce函數。Reduce函數的輸出會被添加到這個Reduce部分的輸出文件中。
(7)、當全部的Map tasks和Reduce tasks都已經完成的時候,master將喚醒用戶程序。到此爲止,用戶代碼中的MapReduce調用返回。
當成功執行完以後,MapReduce的執行結果被存放在R個輸出文件中(每一個Reduce task對應一個,文件名由用戶指定)。一般用戶並不須要將R個輸出文件歸併成一個。由於它們一般將這些文件做爲另外一個MapReduce調用的輸入,或者將它們用於另一個可以以多個文件做爲輸入的分佈式應用。
3.2 Master Data Structures
在master中保存了許多的數據結構。對於每一個Map task和Reduce task,master都保存了它們的狀態(idle,in-progress或者是completed)以及worker所在機器的標識(對於非idle狀態的tasks而言)。
master至關因而一個管道,經過它Map task所產生的中間文件被傳遞給了Reduce task。所以,對於每個已經完成的Map task,master會存儲由它產生的R箇中間文件的位置和大小。當Map task完成的時候,master就會收到位置和大小的更新信息。而這些信息接下來就會逐漸被推送處處於in-progress狀態的Reduce task中。
3.3 Fault Tolerance
由於MapReduce庫的設計初衷是用成千上萬的機器去處理大量的數據,因此它就必須能用優雅的方式對機器故障進行處理。
Worker Failure
master會週期性地ping每個worker。若是通過了一個特定的時間還未從某一個worker上得到響應,那麼master會將worker標記爲failed。全部由該worker完成的Map task都被回退爲idle狀態,所以可以被從新調度到其餘的worker上。一樣的,全部failed worker正在執行的Map task或者Reduce task也會被回退爲idle狀態,而且被從新調度。
發生故障的機器上已經完成的Map task須要從新執行的緣由是,它們的輸入是保存在本地磁盤的,所以發生故障以後就不能獲取了。而已經完成的Reduce task並不須要被從新執行,由於它們的輸出是存放在全局的文件系統中的。
當一個Map task開始由worker A執行,後來又由worker B執行(由於A故障了)。全部執行Reduce task的worker都會收到這個從新執行的通知。那些還未從worker A中讀取數據的Reduce task將會從worker B中讀取數據。
MapReduce對於大面積的機器故障是很是具備彈性的。例如,在一次MapReduce操做中,網絡維護形成了集羣中八十臺機器在幾分鐘的時間內處於不可達的狀態。MapReduce的master只是簡單地將不可達的worker機器上的工做從新執行了一遍,接着再繼續往下執行,最終完成了MapReduce的操做。
Master Failure
對於master,咱們能夠簡單地對上文所述的master數據結構作週期性的快照。若是一個master task死了,咱們能夠很快地根據最新的快照來從新啓動一個master task。可是,由於咱們只有一個master,所以故障的機率比較低。因此,在咱們的實現中若是master出現了故障就只是簡單地中止MapReduce操做。用戶能夠檢測到這種狀況,而且若是他們須要的話能夠從新開始一次MapReduce操做。
Semantics in the Presence of Failures
若是用戶提供的Map和Reduce操做是關於輸入值的肯定性函數,那麼咱們分佈式的實現將會產生一樣的輸出,在整個程序通過沒有出現故障的順序執行以後。
咱們依賴Map task和Reduce task原子性地提交輸出來實現上述特性。每個正在執行的task都會將它的輸出寫到一個私有的臨時文件中。一個Reduce task產生一個這樣的文件,而一個Map task產生R個這樣的文件(每一個Reduce work一個)。當一個Map task完成的時候,worker就會給master發送一個信息,,其中包含了R個臨時文件的名字。若是master收到了一個來自於已經完成了的Map task的完成信息,那麼它就將它自動忽略。不然,將R個文件的名稱記錄到一個master數據結構中。
當一個Reduce task完成的時候,Reduce worker會自動將臨時輸出文件命名爲最終輸出文件。若是同一個Reduce task在多臺機器上運行,那麼多個重命名操做產生的最終輸出文件名將會產生衝突。對此,咱們依賴底層文件系統提供的原子重命名操做來保證最終文件系統中的數據來自一個Reduce task。
大多數的Map和Reduce操做都是肯定性的,事實上,咱們的語義等同於順序執行。所以這讓程序員很是容易地可以解釋他們程序的行爲。當Map和Reduce操做是非肯定性的時候,咱們提供較弱,但仍然合理的語義。在非肯定性的操做中,對於一個特定的Reduce task R1的輸出是和非肯定性程序順序執行產生R1產生的輸出是相同的。然而,對於另外一個Reduce task R2,它的輸出對應於非肯定性程序另外一個順序執行的結果。
下面考慮Map task M和Reduce task R1和R2。讓e(Ri)表示Ri的執行結果。更弱的語義意味着,e(R1)可能從M的一次執行結果中讀取輸入,而e(R2)可能從M的另外一次執行中讀取輸入。
3.4 Locality
網絡帶寬在咱們的計算環境中是相對稀缺的資源。咱們經過將輸入數據存儲在集羣中每臺機器的本地磁盤的方法來節省帶寬。GFS將輸入文件切分紅64MB大小的塊,而且將每一個塊的多份拷貝(一般爲3份)存儲在不一樣的機器上。MapReduce的master獲取全部輸入文件的位置信息,而後將Map task調度到有相應輸入文件副本的機器上。當發生故障時,再將Map task調度到鄰近的具備該task輸入文件副本的機器(即在同一臺交換機內具備相同數據的機器)。當在一個集羣的大量機器上作MapReduce操做時,大多數的輸入數據都是從本地讀取的,而不用消耗帶寬。
3.5 Task Granularity
如上所述,咱們將Map操做分紅M份,Reduce操做分紅R份。在理想的狀況下,M和R的值應該要比集羣中worker machine的數量多得多。讓一個worker同時進行許多不一樣的task有利於提升動態的負載均衡,同時在一個worker故障的時候能儘快恢復。許多已經完成的Map task也能儘快地傳播到其餘全部的worker machine上。
在咱們的實現中,M和R的大小是有一個實用範圍的。由於咱們的master須要作O(M+R)個調度決定,而且還要在內存中保存O(M*R)個狀態。(可是內存使用的常數仍是比較小的,O(M*R)個Map task/Reduce task 狀態對,每一個的大小大概在一個字節)
另外,R一般受限於用戶,由於每一個Reduce task的輸出都分散在不一樣的輸出文件中。事實上,咱們會選擇M,所以每一個輸入文件大概16MB到64MB的輸入文件(所以上文所述的局部性優化會達到最優)。而咱們會讓R成爲worker machine數量的一個較小的倍數。所以,咱們一般在進行MapReduce操做時,將M設爲200000,R設爲5000,使用2000個worker machine。
3.6 Backup Tasks
「straggler」(落伍的士兵)的存在是拖慢整個MapReduce操做的一般的緣由之一。所謂的"straggler"是指一臺機器用了過長的時間去完成整個計算任務中最後幾個Map或者Reduce task。Straggler出現的緣由有不少。好比一臺機器上硬盤壞了,它就會經歷大量的可糾正錯誤,從而讓它的性能從30MB/s降低到1MB/s。集羣的調度系統可能將其餘task調度到該機器上,致使它執行MapReduce代碼的速度變慢不少,由於CPU,內存,本地磁盤,網絡帶寬的競爭加重。咱們最近遇到的一個問題是一臺機器的初始化代碼有點問題,它會致使處理器的緩存被禁用,在這些受影響的機器上進行的計算速度會降低到原來的百分之一。
對此,咱們有一個通用的機制用來緩解straggler的問題。當MapReduce操做接近結束的時候,master會將那些還在執行的task的備份進行調度執行。不管是原來的仍是備份執行完成,該task都被標記爲已完成。咱們經過調整將該操做致使的計算資源消耗僅僅提升了幾個百分點。可是在完成大型的MapReduce操做時,卻讓整個執行時間降低了好多。例如,Section 5.3中所描述的排序算法在備份機制關閉的狀況下,須要多消耗44%的時間。
4 Refinement
雖然對於大多數需求由Map和Reduce函數提供的功能已經足夠了,可是咱們仍是發現了一些有用的擴展。對它們的描述以下。
4.1 Partitioning Function
MapReduce用戶決定他們的Reduce task或者輸出文件的數目R。經過一個劃分函數,根據中間鍵值將各個task的數據進行劃分。默認的劃分函數是經過哈希(好比,hash(key) mod R)。這一般會產生很是好的較爲均衡的劃分。可是在其餘一些狀況下,經過鍵值的其餘函數來劃分要更好一些。例如,有的時候輸出鍵值是一些URL,咱們但願同一個host的內容能放在同一個輸出文件中。爲了支持這種狀況,MapReduce庫的用戶能夠提供一個特殊的劃分函數。例如,使用「hash(Hostname(urlKey)) mod R」做爲劃分函數,從而讓全部來自於同一個host的URL的內容都輸出到同一個輸出文件。
4.2 Ordering Guarantees
咱們確保在一個給定的劃分中,中間的鍵值對都按照鍵值的升序進行處理。這樣的處理順序確保了每個劃分產生一個排好序的輸出文件。這樣的話,若是輸出文件格式須要支持根據key進行有效的隨機查找會比較方便。同時,輸出的用戶也會以爲已經排好序的數據使用起來特別方便。
4.3 Combiner Function
在有些狀況下,每一個Map task都會產生大量的中間鍵的重複而用戶指定的Reduce函數是交互和關聯的。Section 2.1中的單詞統計就是一個很好的例子。由於單詞的出現頻率服從於Zipf分佈,每一個Map Task都會產生成百上千個<the, 1>這樣的記錄。全部這些記錄都會經過網絡被送到一個Reduce task中,而且由Reduce函數加在一塊兒去產生一個數。咱們容許用戶使用了可選的Cominer函數,用於在網絡傳輸以前部分地進行歸併操做。
Combiner函數在每一個執行Map task的機器上執行。一般Combiner和Reduce函數使用的是相同的代碼。Reduce函數和Combiner函數惟一的不一樣是MapReduce庫如何處理函數的輸出。Reduce函數的輸出寫到最終的輸出文件中。而Combiner函數的輸出會被寫到一個最終將被送給Reduce task的中間文件中。
部分的合併操做能極大地加速某類特定的MapReduce操做。Appendix A包含了一個使用Combiner的例子。
4.4 Input and Output Types
MapReduce庫提供了對讀入數據文件多種的格式支持。例如,"text"格式的輸入將每一行做爲鍵值對:key是文件內的偏移,value是該行的內容。另一種比較經常使用的格式存儲一系列按照鍵進行排序的鍵值對。每個輸出格式的實現都知道如何將本身進行合理的劃分從而能讓不一樣的Map task進行處理(例如,text模式就知道將區域劃分到以行爲邊界)。用戶能夠經過簡單地定義一個reader接口來提供一個新的輸入類型的實現。事實上,大多數用戶只使用了預約義輸入類型的很小一部分。
reader並不必定要從文件中讀取數據。例如,咱們能夠很容易地定義一個從數據庫,或者內存中映射的數據結構中讀取記錄的reader。
同理,咱們也支持產生不一樣格式的輸出數據,用戶也能編寫新的輸出數據格式。
4.5 Side-effects
在有些狀況下,MapReduce的用戶會很容易發現Map或者Reduce操做會產生一些輔助文件做爲額外的輸出文件。咱們依賴應用的編寫者去保證這些反作用是原子和冪等的。通常來講,應用會寫到一個臨時文件中,而且在它徹底產生以後,經過一個原子操做將它重命名。
對於一個單一的task產生的多個輸出文件,咱們不提供原子性的兩相提交支持。所以,產生多個輸出文件而且有跨文件一致性要求的task須要是肯定性的。可是這樣的限制在實踐過程當中並非什麼問題。
4.5 Skipping Bad Records
有時候,若是用戶的代碼中有bug的話,會致使Map或者Reduce操做在某些記錄上崩潰。這些bug會致使MapReduce操做的正常完成。對於這種狀況,一般就是去修bug。不過有時候這是不可行的,也許bug是第三方庫形成的,而咱們並不能獲得它的源代碼。並且,有時候咱們容許忽略掉一些記錄,例如在對一個大數據集作分析的時候。所以咱們提供了一種可選的執行模式,當MapReduce庫檢測到一些記錄會形成崩潰時,就會主動跳過它們,從而保證正常地運行。
每個worker進程都安裝了一個signal handler用於捕捉段錯誤和bug。在調用用戶的Map和Reduce操做以前,MapReduce庫會將參數的序號保存在一個全局變量中。若是用戶代碼產生了一個信號,signal handler就會傳輸一個參數含有序號的"last gasp"UDP包給MapReduce的master。當master在一個特定的記錄中發現了不知一次的錯誤,這表示在下一次執行相應的Map或者Reduce操做的時候一個將它跳過。
4.7 Local Execution
Map或者Reduce函數的調試問題是很是tricky的。由於實際的計算髮生在分佈式的系統中,一般由成百上千臺機器組成,而且工做的分配由master動態執行。爲了幫助調試,分析,以及小規模的測試,咱們開發了另一個MapReduce庫的實現,它可以在本地機器上順序執行一個MapReduce操做的全部工做。它的控制交給用戶,所以計算能夠被限定到制定的Map task中執行。用戶利用指定的flag啓動程序,而後就能很是簡單地使用任何它們以爲有用的調試或者測試工具了。
4.8 Status Information
master運行了一個內置的HTTP server而且暴露了一系列供人類使用的狀態頁。狀態頁會顯示程序的計算過程,例如已經完成了多少個task,還有多少個task正在執行,輸入的字節數,中間數據的字節數,輸出的字節數,以及處理速度等等。該頁還包含了指向各個task的標準錯誤和標準輸出連接。用戶能夠利用這些數據來判斷計算會持續多長時間,以及計算是否須要添加更多的資源。這些頁面還能用來發現何時處理速度比預期地降低好多。
另外,頂層的狀態頁顯示了那些worker出錯了,以及在它們出錯時正在執行哪些Map和Reduce task。這些信息在診斷用戶代碼出現的bug時是很是有用的。
4.9 Counter
MapReduce庫提供了一個叫counter的設施用於統計各類不一樣事件出現的次數。例如,用戶可能想要統計已經處理過的單詞的數目或者德國文件的索引數量。
爲了使用這一特性,用戶代碼建立一個命名的counter對象,而且在Map以及Reduce函數中對counter進行增長。例如:
Counter* uppercase;
uppercase = GetCounter("uppercase")
map(String name, String contents):
for each word w in contents:
if(IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w, "1");
每一個worker機器上counter的值會按期傳給master(捎帶在給master的ping回覆中)。master未來自成功執行的Map和Reduce task的counter值彙集起來。而後在MapReduce操做完成以後返回給用戶代碼。當前的counter值也會顯示在master的狀態頁上,因此用戶能從實時觀看計算的進行。在彙集counter的值的時候,master會消除Map或者Reduce task的重複執行形成的重複計算。(重複執行可能由backup tasks或者由於錯誤從新執行的task引發)。
有些counter的值是由MapReduce庫自動維護的,例如已經處理的輸入鍵值對數目以及已經產生的輸出鍵值對數目。
用戶發現counter特性對於檢查MapReduce操做的執行是很是有用的。例如,在有些MapReduce操做中,用戶代碼想要確保產生的輸出對的數目和已經處理的輸入對的數目是剛好相等的,或者處理的德語文件的數目佔總處理文件數目的比重在一個可容忍的範圍內。
5 Performance
在這個section中,咱們經過運行在一個集羣上的兩個computation來測試MapReduce的性能。一個Computation搜索一個T的數據,從中獲取一個特定的模式。另外一個computation對一個T的數據進行排序。
這兩個程序表明了由用戶實際編寫的MapReduce程序的一個子集------一類程序用於將數據從一種表示方法切換到另外一種表示方法。另外一類程序則從大數據集中抽取出一小部分有趣的數據。
5.1 Cluster Configuration
全部程序都運行在一個由1800臺機器組成的機器上。每一臺機器都有兩個2GHz 的Intel Xeon處理器,而且Hyper-Threading打開, 4GB內存,兩個160GB的IDE磁盤,以及一個G的以太網鏈路。這些機器被安排在一個兩層樹狀的交換網絡中,根節點的帶寬大概在100-200Gbps。由於全部機器都在同一個託管設備中,所以任意兩臺機器見的通訊時間少於1ms。
其中4GB中的1-1.5G是爲集羣中運行的其餘任務預留的。程序在一個週末的下午運行,此時CPU,磁盤,網絡基本都處於空閒狀態。
5.2 Grep
grep程序須要掃描10的十次方條100-byte的記錄,搜索一個相對罕見的三字符模式(出現了92337次)。輸入被分紅大概64MB份(M = 15000),全部的輸出文件都存放在一個文件中(R = 1)。
Figure 2顯示了Computation隨着時間的變化過程。Y軸表明了輸入數據的掃描速度。隨着機器逐漸加入MapReduce的計算當中,速度愈來愈快,當有1764個worker加入時,達到峯值30GB/s。隨着Map task的結束,速度開始降低而且在80s的時候到達0,。整個Computation從開始到結束總共花費了大概150s。這其中還包括了1分鐘的啓動開銷。開銷主要來源於將程序分發到worker machine中,和GFS交互並打開1000個輸入文件,以及獲取局部性優化所需的信息的延時。
5.3 Sort
排序程序用於對10的十次方條記錄(大概1T的數據)進行排序。程序以TeraSort benchmark爲模型。