MapReduce:大型集羣上的簡單數據處理

MapReduce:大型集羣上的簡單數據處理程序員

摘要web

MapReduce是一個設計模型,也是一個處理和產生海量數據的一個相關實現。用戶指定一個用於處理一個鍵值(key-value)對生成一組key/value對形式的中間結果的map函數,以及一個將中間結果鍵相同的鍵值對合併到一塊兒的reduce函數。許多現實世界的任務都能知足這個模型,如這篇文章所示。數據庫

使用這個功能形式實現的程序可以在大量的普通機器上並行執行。這個運行程序的系統關心下面的這些細節:輸入數據的分區、一組機器上調度程序執行、處理機器失敗問題,以及管理所需的機器內部的通訊。這使沒有任何並行處理和分佈式系統經驗的程序員可以利用這個大型分佈式系統的資源。編程

咱們的MapReduce實現運行在一個由普通機器組成的大規模集羣上,具備很高的可擴展性:一個典型的MapReduce計算會在幾千臺機器上處理許多TB的數據。程序員們發現這個系統很容易使用:目前已經實現了幾百個MapReduce程序,在Google的集羣上,天天有超過一千個的MapReduce工做在運行。數組

1、        介紹緩存

在過去的5年中,本文做者和許多Google的程序員已經實現了數百個特定用途的計算程序,處理了海量的原始數據,包括抓取到的文檔、網頁請求日誌等,計算各類衍生出來的數據,如反向索引、網頁文檔的圖形結構的各類表示、每一個host下抓取到的頁面數量的總計、一個給定日期內的最頻繁查詢的集合等。大多數這種計算概念明確。然而,輸入數據一般都很大,而且計算必須分佈到數百或數千臺機器上以確保在一個合理的時間內完成。如何並行計算、分佈數據、處理錯誤等問題使這個起初很簡單的計算,因爲增長了處理這些問題的不少代碼而變得十分複雜。網絡

爲了解決這個複雜問題,咱們設計了一個新的抽象模型,它容許咱們將想要執行的計算簡單的表示出來,而隱藏其中並行計算、容錯、數據分佈和負載均衡等很麻煩的細節。咱們的抽象概念是受最先出如今lisp和其它結構性語言中的map和reduce啓發的。咱們認識到,大多數的計算包含對每一個在輸入數據中的邏輯記錄執行一個map操做以獲取一組中間key/value對,而後對含有相同key的全部中間值執行一個reduce操做,以此適當的合併以前的衍生數據。由用戶指定map和reduce操做的功能模型容許咱們可以簡單的進行並行海量計算,並使用re-execution做爲主要的容錯機制。數據結構

這項工做的最大貢獻是提供了一個簡單的、強大的接口,使咱們可以自動的進行並行和分佈式的大規模計算,經過在由普通PC組成的大規模集羣上實現高性能的接口來進行合併。負載均衡

第二章描述了基本的編程模型,並給出了幾個例子。第三章描述了一個爲咱們的聚類計算環境定製的MapReduce接口實現。第四章描述了咱們發現對程序模型頗有用的幾個優化。第六章探索了MapReduce在Google內部的使用,包括咱們在將它做爲生產索引系統重寫的基礎的一些經驗。第七章討論了相關的和將來的工做。框架

2、        編程模型

這個計算輸入一個key/value對集合,產生一組輸出key/value對。MapReduce庫的用戶經過兩個函數來標識這個計算:Map和Reduce。

Map,由用戶編寫,接收一個輸入對,產生一組中間key/value對。MapReduce庫將具備相同中間key I的聚合到一塊兒,而後將它們發送給Reduce函數。

Reduce,也是由用戶編寫的,接收中間key I和這個key的值的集合,將這些值合併起來,造成一個儘量小的集合。一般,每一個Reduce調用只產生0或1個輸出值。這些中間值通過一個迭代器(iterator)提供給用戶的reduce函數。這容許咱們能夠處理因爲數據量過大而沒法載入內存的值的鏈表。

2.1 例子

考慮一個海量文件集中的每一個單詞出現次數的問題,用戶會寫出相似於下面的僞碼:

 

Map函數對每一個單詞增長一個相應的出現次數(在這個例子中僅僅爲「1」)。Reduce函數將一個指定單詞全部的計數加到一塊兒。

此外,用戶使用輸入和輸出文件的名字、可選的調節參數編寫代碼,來填充一個mapreduce規格對象,而後調用MapReduce函數,並把這個對象傳給它。用戶的代碼與MapReduce庫(C++實現)鏈接到一塊兒。。附錄A包含了這個例子的整個程序。

2.2 類型

儘管以前的僞代碼中使用了字符串格式的輸入和輸出,可是在概念上,用戶定義的map和reduce函數須要相關聯的類型:

map       (k1, v1)                      -->         list(k2, v2)

reduce   (k2, list(v2))                -->          list(v2)

也就是說,輸入的鍵和值和輸出的鍵和值來自不一樣的域。此外,中間結果的鍵和值與輸出的鍵和值有相同的域。

MapReduce的C++實現與用戶定義的函數使用字符串類型進行參數傳遞,將類型轉換的工做留給用戶的代碼來處理。

2.3 更多的例子

這裏有幾個簡單有趣的程序,可以使用MapReduce計算簡單的表示出來。

分佈式字符串查找(Distributed Grep):map函數將匹配一個模式的行找出來。Reduce函數是一個恆等函數,只是將中間值拷貝到輸出上。

URL訪問頻率計數(Count of URL Access Frequency):map函數處理web頁面請求的日誌,並輸出<URL, 1>。Reduce函數將相同URL的值累加到一塊兒,生成一個<URL, total count>對。

翻轉網頁鏈接圖(Reverse Web-Link Graph):map函數爲在一個名爲source的頁面中指向目標(target)URL的每一個連接輸出<target, source>對。Reduce函數將一個給定目標URL相關的全部源(source)URLs鏈接成一個鏈表,並生成對:<target, list(source)>。

主機關鍵向量指標(Term-Vector per Host):一個檢索詞向量將出如今一個文檔或是一組文檔中最重要的單詞概述爲一個<word, frequency>對鏈表。Map函數爲每一個輸入文檔產生一個<hostname, term vector>(hostname來自文檔中的URL)。Reduce函數接收一個給定hostname的全部文檔檢索詞向量,它將這些向量累加到一塊兒,將罕見的向量丟掉,而後生成一個最終的<hostname, term vector>對。

倒排索引(Inverted Index):map函數解析每一個文檔,並生成一個<word, document ID>序列。Reduce函數接收一個給定單詞的全部鍵值對,全部的輸出對造成一個簡單的倒排索引。能夠經過對計算的修改來保持對單詞位置的追蹤。

分佈式排序(Distributed Sort):map函數將每一個記錄的key抽取出來,並生成一個<key, record>對。Reduce函數不會改變任何的鍵值對。這個計算依賴了在4.1節提到的分區功能和4.2節提到的排序屬性。

3、        實現

MapReduce接口有不少不一樣的實現,須要根據環境來作出合適的選擇。好比,一個實現可能適用於一個小的共享內存機器,而另外一個實現則適合一個大的NUMA多處理器機器,再另外一個可能適合一個更大的網絡機器集合。

這一章主要描述了針對在Google內部普遍使用的計算環境的一個實現:經過交換以太網將大量的普通PC鏈接到一塊兒的集羣。在咱們的環境中:

(1)    機器一般是雙核x86處理器、運行Linux操做系統、有2-4G的內存。

(2)    使用普通的網絡硬件—一般是100Mb/s或者是1Gb/s的機器帶寬,可是平均值遠小於帶寬的一半。

(3)    由數百臺或者數千臺機器組成的集羣,所以機器故障是很日常的事

(4)    存儲是由直接裝在不一樣機器上的便宜的IDE磁盤提供。一個內部的分佈式文件系統用來管理存儲這些磁盤上的數據。文件系統在不可靠的硬件上使用副本機制提供了可用性和可靠性。

(5)    用戶將工做提交給一個調度系統,每一個工做由一個任務集組成,經過調度者映射到集羣中可用機器的集合上。

3.1 執行概述

經過自動的將輸入數據分區成M個分片,Map調用被分配到多臺機器上運行。數據的分片可以在不一樣的機器上並行處理。使用分區函數(如,hash(key) mod R)將中間結果的key進行分區成R個分片,Reduce調用也被分配到多臺機器上運行。分區的數量(R)和分區函數是由用戶指定的。

 

圖1:執行概述

圖1中顯示了咱們實現的一個MapReduce操做的整個流程。當用戶程序調用MapReduce函數時,下面一系列的行爲將會發生(圖1中所使用的數字標識將與下面列表中的相對應):

1. 用戶程序中的MapReduce庫會先將輸入文件分割成M個一般爲16MB-64MB大小的片(用戶能夠經過可選參數進行控制)。而後它將在一個集羣的機器上啓動許多程序的拷貝。

2. 這些程序拷貝中的一個是比較特殊的——master。其它的拷貝都是工做進程,是由master來分配工做的。有M個map任務和R個reduce任務被分配。Master挑選出空閒的工做進程,並把一個map任務或reduce任務分配到這個進程上。

3. 一個分配了map任務的工做進程讀取相關輸入分片的內容,它將從輸入數據中解析出key/value對,並將其傳遞給用戶定義的Map函數。Map函數生成的中間key/value對緩存在內存中。

4. 緩存中的鍵值對週期性的寫入到本地磁盤,並經過分區函數分割爲R個區域。將這些緩存在磁盤上的鍵值對的位置信息傳回給master,master負責將這些位置信息傳輸給reduce工做進程。

5. 當一個reduce工做進程接收到master關於位置信息的通知時,它將使用遠程調用函數(RPC)從map工做進程的磁盤上讀取緩存的數據。當reduce工做進程讀取完全部的中間數據後,它將全部的中間數據按中間key進行排序,以保證相同key的數據聚合在一塊兒。這個排序是須要的,由於一般許多不一樣的key映射到相同的reduce任務上。若是中間數據的總量太大而沒法載入到內存中,則須要進行外部排序。

6. reduce工做進程迭代的訪問已排序的中間數據,而且對遇到的每一個不一樣的中間key,它會將key和相關的中間values傳遞給用戶的Reduce函數。Reduce函數的輸出追加到當前reduce分區一個最終的輸出文件上。

7. 當全部的map任務和reduce任務完成後,master會喚醒用戶程序。這時候,用戶程序中的MapReduce調用會返回到用戶代碼上。

在成功完成後,MapReduce操做輸出到R個輸出文件(每一個reduce任務生成一個,文件名是由用戶指定的)中的結果是有效的。一般,用戶不須要合併這R個輸出文件,它們常常會將這些文件做爲輸入傳遞給另外一個MapReduce調用,或者在另外一個處理這些輸入分區成多個文件的分佈式應用中使用。

3.2 Master數據結構

Master保留了幾個數據結構。對於每一個Map和Reduce任務,它存儲了它們的狀態(idle、in-progress或者completed),以及工做進程機器的特性(對於非空閒任務)。

Master是中間文件區域的位置信息從map任務傳送到reduce任務的一個通道。所以,對於每一個完成的map任務來講,master存儲了map任務產生的R箇中間文件區域的位置信息和大小。在map任務完成時,master會接收到更新這個含有位置信息和大小信息的消息。信息被增量的傳輸到運行in-progress的reduce任務的工做進程上。

3.3 容錯

由於MapReduce庫是被設計成運行在數百或數千臺機器上幫助處理海量數據的,因此這個庫必須可以優雅的處理機器故障。

工做進程故障

Master週期性的ping每一個工做進程,若是在一個特定的時間內沒有收到響應,則master會將這個工做進程標記爲失效。任何由失效的工做進程完成的map任務都被標記爲初始idle狀態,所以這個map任務會被從新分配給其它的工做進程。一樣的,任何正在處理的map任務或reduce任務也會被置爲idle狀態,進而能夠被從新調度。

在一個失效的節點上完成的map任務會被從新執行,由於它們的輸出被存放在失效機器的本地磁盤上,而磁盤不可訪問。完成的reduce任務不須要從新執行,由於它們的輸出被存儲在全局文件系統上。

當一個map任務先被工做進程A執行,而後再被工做進程B執行(由於A失效了),全部執行reduce任務的工做進程都會接收到從新執行的通知,任何沒有從工做進程A上讀取數據的reduce任務將會從工做進程B上讀取數據。

MapReduce對於大規模工做進程失效有足夠的彈性。好比,在一個MapReduce操做處理過程當中,網絡維護形成了80臺機器組成的集羣幾分鐘內不可達。MapReduce的master會從新執行那些在不可達機器上完成的工做,並持續推動,最終完成MapReduce操做。

Master故障

將上面提到的master數據結構週期性的進行寫檢查點操做(checkpoint)是比較容易的。若是master任務死掉,一個新的拷貝會從最近的檢查點狀態上啓動。然而,假定只有一個單獨的master,它的故障是不大可能的。所以,若是master故障,咱們當前的實現是停止MapReduce計算。

當前故障的語義

當用戶提供的map和reduce操做是輸入肯定性函數,咱們的分佈式實現與無端障序列執行整個程序所生成的結果相同。

咱們依靠map和reduce任務輸出的原子性提交來實現這個屬性。每一個in-progress任務將它們的輸出寫入到一個私有的臨時文件中。一個reduce任務產生一個這樣的文件,一個map任務產生R個這樣的文件(每一個reduce任務一個)。當一個map任務完成時,它將發送給master一個消息,其中包括R個臨時文件的名字。若是master收到一個已經完成的map任務的完成消息,則忽略這個消息。不然,它將這R個文件名記錄在master的數據結構中。

當一個reduce任務完成後,reduce的工做進程自動的將臨時文件改名爲最終的輸出文件,若是相同的reduce任務運行在多臺機器上,會調用多個重命名操做將這些文件改名爲最終的輸出文件。

絕大部分的map和reduce操做是肯定性的,事實上,在這種狀況下咱們的語義與一個序列化的執行是相同的,這使程序開發者可以簡單的推出他們程序的行爲。當map和/或reduce操做是不肯定性的時,咱們提供較弱但依然合理的語義。在不肯定性的操做面前,一個特定的reduce任務R1的輸出與一個序列執行的不肯定性程序生成的輸出相同。然而,一個不一樣的reduce任務R2的輸出可能與一個不一樣的序列執行的不肯定性程序生成的輸出可能一致。

考慮map任務M和reduce任務R1和R2。假定e(Ri)是提交的Ri的執行過程(有且僅有這樣一個過程)。e(R1)可能從M的一個執行生成的輸出中讀取數據,e(R2)可能從M的一個不一樣執行生成的輸出中讀取數據,則會產生較弱的語義。

3.4 位置

在咱們的計算環境中,網絡帶寬是一個相對不足的資源。咱們經過將輸入數據存放在組成集羣的機器的本地磁盤來節省網絡帶寬。GFS將每一個文件分割成64MB大小的塊,每一個塊會在不一樣的機器上存儲幾個拷貝(一般爲3個)。MapReduce master會考慮文件的位置信息,並試圖將一個map任務分配到包含相關輸入數據副本的機器上。若是這樣作失敗,它會試圖將map任務調度到一個包含任務輸入數據的臨近的機器上(例如,與包含輸入數據機器在同一個網絡下進行交互的一個工做進程)。當在集羣的一個有效部分上運行大規模的MapReduce操做時,大多數輸入數據都從本地讀取,不消耗任何網絡帶寬。

3.5 任務粒度

根據上面所提到的,咱們將map階段細分爲M個片,將reduce階段細分爲R個片。理想狀況下,M和R應該比工做機器的數量大得多,每一個工做進程執行不少不一樣的任務來促使負載均衡,在一個工做進程失效時也可以快速的恢復:許多完成的map任務能夠傳播到其它全部的工做機器上。

在咱們的實現中,對於取多大的M和R有一個實際的界限,由於如上面提到的那樣,master必須進行O(M+R)次調度,在內存中保持O(M*R)個狀態。(對內存使用的恆定因素影響較小,然而:對由每一個map任務/reduce任務對佔用大約一個字節所組成的O(M*R)片的狀態影響較大。)

此外,R常常是由用戶約束的,由於每一個reduce任務的輸出最終放在一個分開的輸出文件中。實際中,咱們傾向選擇M值,以使每個獨立的任務可以處理大約16MB到64MB的輸入數據(可使上面提到的位置優化有更好的效果),把R值設置爲咱們想使用的工做機器的一個小的倍數。咱們常用2000個工做機器,設置M=200000和R=5000,來執行MapReduce計算。

3.6 備用任務

影響一個MapReduce操做總體執行時間的一個一般因素是「落後者」:一個使用了異常的時間完成了計算中最後幾個map任務或reduce任務中的一個的機器。可能有不少因素致使落後者的出現,例如,一個含有損壞磁盤的機器頻繁的處理可校訂的錯誤,使它的讀取速度從30MB/s降低到了1MB/s。集羣調度者可能將其它的任務分配到這個機器上,因爲CPU、內存、磁盤或網絡帶寬的競爭會致使MapReduce代碼執行的更慢。咱們遇到的最近一個問題是機器初始化代碼中的一個bug,它會使處理器的緩存不可用:受到這個問題影響的機器會慢上百倍。

咱們使用一個普通的機制來緩解落後者問題。當一個MapReduce操做接近完成時,master調度備用(backup)任務執行剩下的、處於in-process狀態的任務。一旦主任務或是備用任務完成,則將這個任務標識爲已經完成。咱們優化了這個機制,使它一般可以僅僅增長少許的操做所使用的計算資源。咱們發現這能有效的減小完成大規模MapReduce操做所須要的時間。做爲一個例子,5.3節所描述的那種程序在禁用備用任務機制的狀況下,會須要多消耗44%的時間。

4、        細化

儘管簡單的編寫Map和Reduce函數提供的基本功能足夠知足大多數須要,可是,咱們發現一些擴展是頗有用的。這會在本章進行描述。

4.1 分區函數

MapReduce的用戶指定所但願的reduce任務/輸出文件的數量(R)。使用分區函數在中間鍵上將數據分區到這些任務上。一個默認的分區函數使用hash方法(如「hash(key) mod R」),它能產生至關平衡的分區。然而,在一些狀況下,須要使用其它的在key上的分區函數對數據進行分區。爲了支持這種狀況,MapReduce庫的用戶可以提供指定的分區函數。例如,使用「hash(Hostname(urlkey)) mod R」做爲分區函數,使全部來自同一個host的URL最終放到同一個輸出文件中。

4.2 順序保證

咱們保證在一個給定的分區內,中間key/value對是根據key值順序增量處理的。順序保證可使它易於生成一個有序的輸出文件,這對於輸出文件須要支持有效的隨機訪問,或者輸出的用戶方便的查找排序的數據頗有幫助。

4.3 組合(Combiner)函數

在一些狀況下,每一個map任務產生的中間key會有不少重複,而且用戶指定的reduce函數知足結合律和交換律。2.1節中提到的單詞技術的例子就是一個很好的例子。由於單詞頻率傾向於zifp分佈,每一個map任務都會產生數百或數千個<the, 1>形式的記錄。全部這些計數都會經過網絡發送給一個單獨的reduce任務,而後經過Reduce函數進行累加併產生一個數字。咱們容許用戶指定一個可選的Combiner函數,它能在數據經過網絡發送前先對這些數據進行局部合併。

Combiner函數在每臺執行map任務的機器上執行。一般狀況下,combiner函數和reduce函數的代碼是相同的,二者惟一不一樣的是MapReduce庫如何處理函數的輸出。Reduce函數的輸出被寫入到一個最終的輸出文件中,而combiner函數會寫入到一個將被髮送給reduce函數的中間文件中。

局部合併能夠有效的對某類MapReduce操做進行加速。附錄A包含了一個使用combiner函數的例子。

4.4 輸入和輸出類型

MapReduce庫支持幾種不一樣格式的輸入數據。好比,「text」模式的輸入能夠講每一行看出一個key/value對:key是該行在文件中的偏移量,value是該行的內容。另外一中常見的支持格式是根據key進行排序存儲一個key/value對的序列。每種輸入類型的實現知道如何將本身分割成對map任務處理有意義的區間(例如,text模式區間分割確保區間分割只在行的邊界進行)。用戶可以經過實現一個簡單的讀取(reader)接口來增長支持一種新的輸入類型,儘管大多數用戶僅僅使用了預約義輸入類型中的一小部分。

Reader並非必須從文件中讀取數據,好比,咱們能夠容易的定義一個從數據庫中讀取記錄,或者從內存的數據結構中讀取數據的Reader。

相似的,咱們提供一組輸出類型來產生不一樣格式的數據,用戶也能夠簡單的經過代碼增長對新輸出類型的支持。

4.5 反作用

在一些狀況下,MapReduce的用戶發現爲它們的map和/或reduce操做的輸出生成輔助的文件很方便。咱們依靠應用的writer將這個反作用變成原子的和冪等的。一般,應用會將結果寫入到一個臨時文件,而後在數據徹底生成後,原子的重命名這個文件。

若是一個單獨任務產生的多個輸出文件,咱們沒有提供兩階段提交的原子操做。所以,產生多個輸出文件且對交叉文件有一致性需求的任務應該是肯定性的操做。可是在實際工做中,這個限制並非一個問題。

4.6 跳過損壞的記錄

有時,在咱們的代碼中會存在一些bug,它們會致使Map或Reduce函數在處理特定的記錄上必定會Crash。這樣的bug會阻止MapReduce操做順利完成。一般的作法是解決這個bug,但有時,這是不可行的;多是因爲第三方的庫中的bug,而咱們沒有這個庫的源碼。有時,忽略一些記錄也是能夠接受的,例如,當在海量的數據集上作數據統計時。咱們提供了一個可選的運行模式,MapReduce庫探測出哪些記錄會致使肯定性的Crash,並跳過這些記錄以繼續執行這個程序。

每一個工做進程都安裝了一個信號處理器,它能捕獲段錯誤和總線錯誤。在調用用戶的Map或Reduce操做以前,MapReduce庫將記錄的序號存儲到全局變量中。若是用戶代碼產生一個信號,這個信號處理器會向MapReudce master發送一個「臨死前」的UDP包,其中包含了這個序號。當master看到對於一個特定的記錄有多個失敗信號時,在相應的Map或Reduce任務下一次從新執行時,master會通知它跳過這個記錄。

4.7 本地執行

在Map或Reduce函數中調試問題是很棘手的,由於實際的計算是發生在一個分佈式系統上的,一般有幾千臺機器,而且是由master動態分配的。爲了有助於調試、性能分析和小規模測試,咱們開發了一個MapReduce庫可供選擇的實現,它將在本地機器上序列化的執行一個MapReduce的全部工做。這爲用戶提供了對MapReduce操做的控制,使計算能被限制在一個特定的map任務上。用戶使用標記調用他們的程序,並可以簡單的使用它們找到的任何調試或測試工具(如,gdb)。

4.8 狀態信息

Master運行了一個內部的HTTP服務,並顯示出狀態集頁面供人們查看,如,有多少任務已經完成、有多少正在處理、輸入的字節數、中間數據的字節數、輸出的字節數、處理速率等。這些頁面也包含了指向每一個任務生成的標準錯誤和標準輸出文件的連接。用戶能使用這些數據預測這個計算將要持續多長時間,以及是否應該向這個計算添加更多的資源。這些頁面也有助於找出計算比預期執行慢的多的緣由。

此外,頂層的狀態頁顯示了哪些工做進程失效,哪些map和reduce任務在處理時失敗。這個信息對試圖診斷出用戶代碼中的bug頗有用。

4.9 計數器

MapReduce庫提供了一個計數器,用於統計不一樣事件的發生次數。好比,用戶代碼想要統計已經處理了多少單詞,或者已經對多少德國的文檔創建了索引等。

用戶代碼可使用這個計數器建立一個命名的計數器對象,而後在Map和/或Reduce函數中適當的增長這個計數器的計數。例如:

 

獨立的工做機器的計數器值週期性的傳送到master(附在ping的響應上)master將從成功的map和reduce任務上獲取的計數器值進行彙總,當MapReduce操做完成時,將它們返回給用戶的代碼。當前的計數器值也被顯示在了master的狀態頁面上,令人們可以看到當前計算的進度。當彙總計數器值時,master經過去掉同一個map或reduce任務的屢次執行所形成的影響來防止重複計數。(重複執行可能會在咱們使用備用任務和從新執行失敗的任務時出現。)

一些計數器的值是由MapReduce庫自動維護的,如已處理的輸入key/value對的數量和已生成的輸出key/value對的數量。

用戶發現計數器對檢查MapReduce操做的行爲頗有用處。例如,在一些MapReduce操做中,用戶代碼可能想要確保生成的輸出對的數量是否精確的等於已處理的輸入對的數量,或者已處理的德國的文檔數量在已處理的全部文檔數量中是否被容忍。

5、        性能

在這章中,咱們測試兩個運行在一個大規模集羣上的MapReduce計算的性能。一個計算在大約1TB的數據中進行特定的模式匹配,另外一個計算對大約1TB的數據進行排序。

這兩個程序可以表明實際中大量的由用戶編寫的MapReduce程序,一類程序將數據從一種表示方式轉換成另外一種形式;另外一類程序是從海里的數據集中抽取一小部分感興趣的數據。

5.1 集羣配置

全部的程序運行在一個由將近1800臺機器組成的集羣上。每一個機器有兩個2GHz、支持超線程的Intel Xeon處理器、4GB的內存、兩個160GB的IDE磁盤和一個1Gbps的以太網鏈路,這些機器部署在一個兩層的樹狀交換網絡中,在根節點處有大約100-200Gbps的帶寬。全部的機器都採用相同的部署,所以任意兩個機器間的RTT都小於1ms。

在4GB內存裏,有接近1-1.5GB用於運行在集羣上的其它任務。程序在一個週末的下午開始執行,這時主機的CPU、磁盤和網絡基本都是空閒的。

5.2 字符串查找(Grep)

這個grep程序掃描了大概1010個100字節大小的記錄,查找出現機率相對較小的3個字符的模式(這個模式出如今92337個記錄中)。輸入被分割成接近64MB的片(M=15000),整個輸出被放到一個文件中(R=1)。

 

圖2:數據傳輸速率

圖2顯示了計算隨時間的進展狀況。Y軸顯示了輸入數據的掃描速率,這個速率會隨着MapReduce計算的機器數量的增加而增加,當1764個工做進程參與計算時,總的速率超過30GB/s。隨着map任務的完成,速率開始降低,並在計算的大約第80秒變爲0,整個計算從開始到結束大約持續了150秒,這包含了大約1分鐘的啓動時間開銷,這個開銷是由將程序傳播到全部工做機器的時間、等待GFS文件系統打開1000個輸入文件集的時間和獲取位置優化所需信息的時間形成的。

5.3 排序

排序程序對1010個100字節大小的記錄(接近1TB的數據)進行排序,這個程序模仿了TeraSort benchmark。

排序程序由不到50行的用戶代碼組成,一個三行的Map函數從一個文本行中抽取出一個10字節的key,並將這個key和原始的文本行做爲中間的key/value對進行輸出。咱們使用內置的Identity函數做爲Reduce操做。這個函數將中間key/value對不作任何修改的輸出,最終排序結果輸出到兩路複製的GFS文件中(如,該程序輸出了2TB的數據)。

如前所述,輸入數據被分割爲64MB大小的片(M=15000),將輸出結果分紅4000個文件(R=4000)。分區函數使用了key的開頭字符將數據分隔到R片中的一個。

這個基準測試的分區函數內置了key的分區信息。在一個普通的排序程序中,咱們將增長一個預處理MapReduce操做,它可以對key進行抽樣,經過key的抽樣分佈來計算最終排序處理的分割點。

 

圖3:對於排序程序的不一樣執行過程隨時間的數據傳輸速率

圖3(a)顯示了排序程序的正常執行過程。左上方的圖顯示了輸入讀取的速率,這個速率峯值大約爲13GB/s,由於全部的map任務執行完成,速率也在200秒前降低到了0。注意,這裏的輸入速率比字符串查找的要小,這是由於排序程序的map任務花費了大約一半的處理時間和I/O帶寬將終結結果輸出到它們的本地磁盤上,字符串查找相應的中間結果輸出幾乎能夠忽略。

左邊中間的圖顯示了數據經過網絡從map任務發往reduce任務的速率。這個緩慢的數據移動在第一個map任務完成時會盡快開始。圖中的第一個峯值是啓動了第一批大概1700個reduce任務(整個MapReduce被分配到大約1700臺機器上,每一個機器每次最多隻執行一個reduce任務)。這個計算執行大概300秒後,第一批reduce任務中的一些執行完成,咱們開始執行剩下的reduce任務進行數據處理。全部的處理在計算開始後的大約600秒後完成。

左邊下方的圖顯示了reduce任務就愛那個排序後的數據寫到最終的輸出文件的速率。在第一個處理週期完成到寫入週期開始間有一個延遲,由於機器正在忙於對中間數據進行排序。寫入的速率會在2-4GB/s上持續一段時間。全部的寫操做會在計算開始後的大約850秒後完成。包括啓動的開銷,整個計算耗時891秒,這與TeraSort benchmark中的最好記錄1057秒類似。

一些事情須要注意:由於咱們的位置優化策略,大多數數據從本地磁盤中讀取,繞開了網絡帶寬的顯示,因此輸入速率比處理速率和輸出速率要高。處理速率要高於輸出速率,由於輸出過程要將排序後的數據寫入到兩個拷貝中(爲了可靠性和可用性,咱們將數據寫入到兩個副本中)。咱們將數據寫入兩個副本,由於咱們的底層文件系統爲了可靠性和可用性提供了相應的機制。若是底層文件系統使用容錯編碼(erasure coding)而不是複製,寫數據的網絡帶寬需求會下降。

5.4 備用任務的做用

在圖3(b)中,咱們顯示了一個禁用備用任務的排序程序的執行過程。執行的流程與如3(a)中所顯示的類似,除了有一個很長的尾巴,在這期間幾乎沒有寫入行爲發生。在960秒後,除了5個reduce任務的全部任務都執行完成。然而,這些落後者只到300秒後才執行完成。整個計算任務耗時1283秒,增長了大約44%的時間。

5.5 機器故障

在圖3(c)中,咱們顯示了一個排序程序的執行過程,在計算過程開始都的幾分鐘後,咱們故意kill掉了1746個工做進程中的200個。底層的調度者會迅速在這些機器上重啓新的工做進程(由於只有進程被殺掉,機器自己運行正常)。

工做進程死掉會出現負的輸入速率,由於一些以前已經完成的map工做消失了(由於香港的map工做進程被kill掉了),而且須要從新執行。這個map任務會至關快的從新執行。整個計算過程在933秒後完成,包括了啓動開銷(僅僅比普通狀況多花費了5%的時間)。

6、        經驗

咱們在2003年2月完成了MapReduce庫的第一個版本,並在2003年8月作了重大的改進,包括位置優化、任務在工做機器上的動態負載均衡執行等。從那時起,咱們驚喜的發現,MapReduce庫可以普遍的用於咱們工做中的各類問題。它已經被用於Google內部普遍的領域,包括:

  • 大規模機器學習問題
  • Google新聞和Froogle產品的集羣問題
  • 抽取數據用於公衆查詢的產品報告
  • 從大量新應用和新產品的網頁中抽取特性(如,從大量的位置查詢頁面中抽取地理位置信息)
  • 大規模圖形計算

 

圖4:隨時間變化的MapReduce實例

圖4中顯示了在咱們的源碼管理系統中,隨着時間的推移,MapReduce程序的數量有明顯的增長,從2003年早期的0增長到2004年9月時的900個獨立的實例。MapReduce如此的成功,由於它使利用半個小時編寫的一個簡單程序可以高效的運行在一千臺機器上成爲可能,這極大的加快了開發和原型設計的週期。此外,它容許沒有分佈式和/或並行系統經驗的開發者可以利用這些資源開發出分佈式應用。

 

表1: 2004年8月運行的MapReduce任務

在每一個工做的最後,MapReduce庫統計了工做使用的計算資源。在表1中,咱們看到一些2004年8月在Google內部運行的MapReduce工做的一些統計數據。

6.1 大規模索引

目前爲止,MapReduce最重要的應用之一就是完成了對生產索引系統的重寫,它生成了用於Google網頁搜索服務的數據結構。索引系統的輸入數據是經過咱們的爬取系統檢索到的海量文檔,存儲爲就一個GFS文件集合。這些文件的原始內容還有超過20TB的數據。索引程序是一個包含了5-10個MapReduce操做的序列。使用MapReduce(代替了以前版本的索引系統中的adhoc分佈式處理)有幾個優勢:

  • 索引程序代碼是一個簡單、短小、易於理解的代碼,由於容錯、分佈式和並行處理都隱藏在了MapReduce庫中。好比,一個計算程序的大小由接近3800行的C++代碼減小到使用MapReduce的大約700行的代碼。
  • MapReduce庫性能很是好,以致於可以將概念上不相關的計算分開,來代替將這些計算混合在一塊兒進行,避免額外的數據處理。這會使索引程序易於改變。好比,對以前的索引系統作一個改動大概須要幾個月時間,而對新的系統則只須要幾天時間。
  • 索引程序變得更易於操做,由於大多數因爲機器故障、機器處理速度慢和網絡的瞬間阻塞等引發的問題都被MapReduce庫自動的處理掉,而無需人爲的介入。

7、        相關工做

許多系統都提供了有限的程序模型,而且對自動的並行計算使用了限制。好比,一個結合函數能夠在logN時間內在N個處理器上對一個包含N個元素的數組使用並行前綴計算,來獲取全部的前綴[6,9,13]。MapReduce被認爲是這些模型中基於咱們對大規模工做計算的經驗的簡化和精華。更爲重要的是,咱們提供了一個在數千個處理器上的容錯實現。相反的,大多數並行處理系統只在較小規模下實現,並將機器故障的處理細節交給了程序開發者。

Bulk Synchronous Programming和一些MPI源於提供了更高層次的抽象使它更易於讓開發者編寫並行程序。這些系統和MapReduce的一個關鍵不一樣點是MapReduce開發了一個有限的程序模型來自動的並行執行用戶的程序,並提供了透明的容錯機制。

咱們的位置優化機制的靈感來自於移動磁盤技術,計算用於處理靠近本地磁盤的數據,減小數據在I/O子系統或網絡上傳輸的次數。咱們的系統運行在掛載幾個磁盤的普通機器上,而不是在磁盤處理器上運行,可是通常方法是相似的。

咱們的備用任務機制與Charlotte系統中採用的eager調度機制相似。簡單的Eager調度機制有一個缺點,若是一個給定的任務形成反覆的失敗,整個計算將以失敗了結。咱們經過跳過損壞計算路的機制,解決了這個問題的一些狀況。

MapReduce實現依賴了內部集羣管理系統,它負責在一個大規模的共享機器集合中分發和運行用戶的任務。儘管不是本篇文章的焦點,可是集羣管理系統在本質上與像Condor的其它系統相似。

排序功能是MapReduce庫的一部分,與NOW-Sort中的操做相似。源機器(map工做進程)將將要排序的數據分區,並將其發送給R個Reduce工做進程中的一個。每一個reduce工做進程在本地對這些數據進行排序(若是可能的話就在內存中進行)。固然NOW-Sort沒有使MapReduce庫可以普遍使用的用戶定義的Map和Reduce函數。

River提供了一個編程模型,處理進程經過在分佈式隊列上發送數據來進行通訊。像MapReduce同樣,即便在不均勻的硬件或系統顛簸的狀況下,River系統依然試圖提供較好的平均性能。River系統經過當心的磁盤和網絡傳輸調度來平衡完成時間。經過限制編程模型,MapReduce框架可以將問題分解成不少細顆粒的任務,這些任務在可用的工做進程上動態的調度,以致於越快的工做進程處理越多的任務。這個受限制的編程模型也容許咱們在工做將要結束時調度冗餘的任務進行處理,這樣能夠減小不均勻狀況下的完成時間。

BAD-FS與MapReduce有徹底不一樣的編程模型,不像MapReduce,它是用於在廣域網下執行工做的。然而,它們有兩個基本類似點。(1)兩個系統都使用了從新執行的方式來處理因故障而丟失的數據。(2)兩個系統都本地有限調度原則來減小網絡鏈路上發送數據的次數。

TASCC是一個用於簡化結構的高可用性的網絡服務。像MapReduce同樣,它依靠從新執行做爲一個容錯機制。

8、        總結

MapReduce編程模型已經成功的應用在Google內部的許多不一樣的產品上。咱們將這個成功歸功於幾個緣由。第一,模型很易用,即便對那些沒有並行計算和分佈式系統經驗的開發者,由於它隱藏了並行處理、容錯、本地優化和負載均衡這些處理過程。第二,各類各樣的問題都能用MapReduce計算簡單的表示出來,例如,MapReduce被Google網頁搜索服務用於生成數據、排序、數據挖掘、機器學習和許多其它系統。第三,咱們已經實現了擴展到由數千臺機器組成的大規模集羣上使用的MapReduce。這個實現可以有效的利用這些機器自由,所以適合在Google內部遇到的不少海量計算問題。

咱們從這項工做中學到了幾樣東西。第一,限制程序模型使得並行計算和分佈式計算變得容易,也容易實現這樣的計算容錯。第二,網絡帶寬是一個稀有的資源,所以咱們系統中的不少優化的目標都是爲了減小數據在網絡上的傳輸次數:位置優化容許咱們從本地磁盤讀取數據,並將中間數據的一個拷貝寫入到本地磁盤,以此來節省網絡帶寬的使用。第三,冗餘執行可以用於減小容許速度慢的機器所形成的影響,而且可以處理機器故障和數據丟失。

相關文章
相關標籤/搜索