MapReduce 是谷歌 2004 年(Google 內部是從03年寫出第一個版本)發表的論文裏提出的一個概念。雖然已通過去15 年了,但如今回顧這個大數據時代始祖級別概念的背景、原理和實現,仍能得到對分佈式系統的不少直覺性的啓發,所謂溫故而知新。node
在Google 的語境裏,MapReduce 既是一種編程模型,也是支持該模型的一種分佈式系統實現。它的提出,讓沒有分佈式系統背景的開發者,也能較輕鬆的利用大規模集羣以高吞吐量的方式來處理海量數據。其解決問題思路很值得借鑑:找到需求的痛點(如海量索引如何維護,更新和排名),對處理關鍵流程進行高階抽象(分片Map,按需Reduce),以進行高效的系統實現(所謂量體裁衣)。這其中,如何找到一個合適的計算抽象,是最難的部分,既要對需求有直覺般的瞭解,又要具備極高的計算機科學素養。固然,而且可能更爲接近現實的是,該抽象是在根據需求不斷試錯後進化出的海水之上的冰山一角。python
谷歌當時做爲互聯網的最大入口,維護着世界全網索引,最先觸到了數據量的天花板。即,哪怕針對很簡單的業務邏輯:如從爬蟲數據中生成倒排索引、將圖狀網頁集合用不一樣方式組織、計算每一個主機爬取的網頁數量、給定日期的高頻查詢詞彙等等,在全球互聯網數據的尺度的加成下,也變的異常複雜。c++
這些複雜性包括:輸入數據分散在很是多的主機上、計算耗資源太多單機難以完成、輸出數據須要跨主機進行從新組織。爲此,不得不針對每一個需求重複構造專用系統,並耗費大量代碼在分發數據和代碼、調度和並行任務、應對機器故障和處理通訊失敗等問題上。git
map 和 reduce 的抽象靈感來自於函數式編程語言 Lisp,爲何選定這兩個概念呢?這來源於谷歌人對其業務的高度提煉:首先輸入能夠切分紅一個個邏輯的記錄 (record);而後對其每一個 record 執行某種映射 (map) 操做,生成一些鍵值對組成的中間結果(爲何要分鍵和值呢?爲最後一步作鋪墊,容許用戶將中間結果以任意指定的方式——鍵,來進行組織規約);最後在具備相同鍵的中間結果子集上執行規約(reduce ,包括排序,統計,提取最值等等)操做。github
函數式模型的另外一個特色在於對 map 操做實現的約束,即規定用戶應提供一個無反作用的 map 操做(相關概念有純函數,肯定性,冪等性等等,固然他們的概念並不同,後面小結會詳細討論)。如此限制,好處有二,能夠進行大規模並行執行,能夠經過換地兒重試來屏蔽主機故障。算法
具體到落地上,map 和 reduce 都是用戶自定義函數。map 函數接受一個 Record,不過爲了靈活,通常也組織爲鍵值對;而後產生 List[key, value],reduce 函數接受一個 key 和該 key 對應的全部中間結果 List[value]。即:數據庫
map (k1,v1) -→ list(k2,v2)
reduce (k2,list(v2)) -→ list(v2)
複製代碼
拿由谷歌這篇論文提出,後來成爲大數據處理界的 hello world 級別示例程序 Word Count (對一堆文檔中的單詞計數)來講,map 和 reduce 的實現長這樣:編程
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
複製代碼
這裏有兩個有意思的點:api
抽象定了,那麼實現天然能夠有不一樣,這也是接口和實現分離的意義所在。前者的抽象是一種思想,谷歌已經給你作了;後者的實現,徹底能夠根據本身的生產環境進行量體裁衣的來定製實現。谷歌在 paper 中給了一種內部經典版,Hadoop 也提供了一套通用版,固然咱們也能夠根據本身的業務需求和場景約束來實現一個合身版。緩存
谷歌發佈論文時 實現 MapReduce 所面對的系統環境長這樣:
輸入由用戶指定切分大小後,切分紅 M 份,而後分散到不一樣機器上(因爲 GFS 的存在,也可能該輸入 Block 原本就在那臺機器上)。每一個機器上會並行的跑用戶定義的 map 。map 輸出的中間結果,亦由用戶指定,按 key 值範圍切分爲 R 份,對於每一箇中間結果,經過 node label = hash(key) mod R 決定其去處。下面是流程概覽圖:
通常而言,用戶無需將最終結果的 R 個 Partition 進行合併,而是將其直接做爲下一個 MapReduce 任務的輸入。Spark RDD 的partition 就是將這一特色概念化了,而且將每一步 MapReduce 輸出也放內存中,不進行落盤,以下降連續 MapReduce 任務的延遲。
計算機科學中經常使用的一個原理,叫作*局部性原理*** (locality reference,這裏特指空間局部性),說的是程序在順序執行時,訪問了一塊數據,接下來大機率會訪問該數據(物理位置上)旁邊的一塊數據。很樸素的斷言,倒是一切 cache 發揮做用的基礎,計算機存儲所以也造成了由慢到快,由賤到貴,由大到小的存儲層次體系(硬盤-> 內存 -> 緩存 -> 寄存器)。在分佈式環境中,這個層次體系至少還要再罩上一層——網絡IO。
在 MapReduce 系統中,咱們也會充分利用輸入數據的 locality。只不過此次,不是將數據加載過來,而是將程序調度過去(Moving Computation is Cheaper than Moving Data)。若是輸入存在 GFS 上,表現形式將爲一系列的邏輯 Block,每一個 Block 可能會有幾個(通常是三個)物理副本。對於輸入每一個邏輯 Block,咱們能夠在其某個物理副本所在機器上運行 Map Task(若是失敗,就再換一個副本),由此來儘可能減少網絡數據傳輸。從而下降了延遲,節約了帶寬。
谷歌的 MapReduce 實現是有做業(Job)級別的封裝的,每一個 Job 包含一系列任務(Task),即 Map Task 和 Reduce Task。那麼,咱們要維護一個正在運行的 Job 的元信息,就勢必要保存全部正在執行的 Task 的狀態,其所在的機器 ID 等等。並且,Master 事實上充當了 Map Task 輸出到 Reduce Task 輸入的一個"管道"。每個 Map Task 結束時,會將其輸出的中間結果的位置信息通知 Master,Master 再將其轉給對應的 Reduce Task,Reduce Task 再去對應位置拉取對應 size 的數據。注意,因爲 Map Task 的結束時間不統一,這個***通知->轉發-> 拉取*** 的過程是增量的。那麼不難推測出,reduce 側對中間數據排序的應該是一個不斷 merge 的過程,不大多是等全部數據就位了再全局排序。
在分佈式系統中,一個比較忌諱的問題就是單點。由於是牽一髮而動全身,而 Master 就是這樣一個單點。固然單個機器的統計平均故障率並不高,可是一旦故障,那麼整個集羣都將不可用。但同時,有一個 Leader 節點會大大簡化分佈式系統的的設計;所以採用單點 Master 的系統反而是主流,那勢必須要開發一些其餘手段來強化 master 的容錯能力,好比說記 log + snapshot、好比說主從備份、好比說每次從 worker 心跳進行狀態重建、好比說用其餘實現了分佈式一致性協議的系統來保存元信息等等。
集羣中有 Master 和 Worker 兩種機器角色。
Worker 因爲數量大,有機器故障機率較大。在分佈式系統中,Master 獲取 Workers 的信息,最多見即是心跳,既能夠是 master ping worker,也能夠反過來,也能夠兼而有之。master 經過心跳發現某些 worker 不可到達後(多是 worker 死掉了,也多是網絡出問題了等),就會將該 Worker 打個故障(failed)的標記。
以前已經調度到該故障 Worker 上的任務(Task) 很顯然有兩種類型: Map Task 和 Reduce Task。對於 Map Task(如下所提的 Task,確定是從屬於未結束的 Job) ,無論成功與否,咱們都要進行重試,由於一旦該 Worker 變爲不可達,存於其上的中間結果也隨之沒法被 Reduce Task 獲取。固然,咱們能夠在 Master 中多記點狀態來減小對已完成的 Map Task 進行重試的機率。好比記下某個 Map Task 的輸出是否已經都被 Reduce Task 拉取,以決定要不要對正常完成的 Map Task 進行重試,但無疑會提升系統複雜度。*工程每每會對環境作某些假設, 以簡化某些實現;*咱們假設 worker 失敗率不是那麼高,或者重試全部 Map Task 的代價能夠忍,那麼就能夠簡化一點實現,以保持系統的簡約,減小可能的 bug。對於 Reduce Task,未完成的無疑要進行重試,已經完成的,因爲其輸出結果咱們假設會寫到全局分佈式系統文件中(即某些機器掛了也不影響),就不會重試。
具體重試的方法,能夠標記須要重試的 Task 的狀態爲 idle,以告訴調度器,該 Task 能夠從新被調度。固然,也能夠實現爲從一個(工做/完成)隊列倒騰到另外一個(就緒)隊列,本質上是同樣的,都是合理實現一個 Task 的狀態機。
至於 master 的故障恢復,上一節稍有提到。若是在實踐中 Master 確實不多死掉,而且偶爾死掉形成全部正在運行的任務失敗的後果也能夠接受,那麼就能夠粗暴的實現爲若是 Master 死掉,就簡單通知全部正在運行的任務的用戶代碼任務失敗了(好比返回非 0 值),而後有用戶代碼自行決定丟棄任務仍是待集羣重啓後進行重試:
MapReduceResult result;
if (!MapReduce(spec, &result)) abort();
複製代碼
若是業務對於宕機時間有要求,或者大面積任務失敗不能夠忍受,那麼就須要加強 Master 的容錯性。經常使用的方法上節有提到,這裏展開一下:
還值得一提的是,容錯也須要用戶側代碼作配合。由於框架會對不成功的 map/reduce 用戶代碼進行重試。這就要求,用戶提供的 map/reduce 邏輯符合肯定性(Deterministic):即函數的輸出依賴且僅依賴於輸入,而不取決任何其餘隱形的輸入或者狀態。固然,這個蘊含了冪等性(Idempotency):屢次執行和一次執行效果同樣;可是冪等性並不能推出肯定性;假設有這麼一個函數,它第一次執行形成了一些狀態改變(好比某些釋放資源的 dispose 函數),然後續發現狀態已經改變過了就再也不改變該狀態,那麼它符合冪等性;可是因爲其含有隱式狀態輸入,不是肯定性的。
若是 map/reudce 函數是肯定性的,那麼框架就能夠放心大膽重試了。某些條件下,冪等性也能夠接受,好比保存隱式狀態的地方很牢靠。舉個栗子,咱們依賴於一個文件鎖作判斷某個函數執行了一次或屢次,若是該文件鎖所依賴的文件系統很穩定,而且提供分佈式一致性,那麼就徹底能夠。若是是用 nfs 的一個文件作鎖,來實現的所謂冪等性就值得商榷了。
若是 map/reduce 函數是肯定性的,框架會經過其輸出提交的原子性來進行冪等性保證。即,即便重試了屢次,也和只執行了一次同樣。具體來講,對於 Map Task,會產生 R 個臨時文件,並在結束時將其位置發送給 Master;Master 在收到屢次同一分片(split) 的位置信息時,若是該分片前面某次結果來源仍可用或者已經被消費,那麼就忽略掉該請求後面的全部請求。對於 Reduce Task,其生成的結果也會先寫成臨時文件,而後依賴於底層文件系統的原子性的更名操做(原子性更名也是一個多進程競爭的經典的操做,由於生成文件過程比較長,不容易作成原子的,可是判斷具備某名字的文件是否存在並更名卻很容易作成原子的),在處理完成時改變爲目的文件名。若是發現已經有一個具備該目的文件名的文件了,就放棄更名操做,從而保證了該 Reduce Task只有一個成功輸出的最終文件。
一個 MapReduce Job 中會產生 M+R 個 Task,具體 M 和 R 的值在運行以前能夠由人進行配置。不一樣的系統實現可能會有發揮出最佳系統性能的不一樣配比;可是同時要兼顧業務需求,好比輸入大小,輸出文件個數等等。
在實際業務中,因爲某些主機緣由常會出現長尾效應,即少數幾個 Map/Reduce Task 老是會巨慢的拖到最後,甚至拖得時間甚至是其餘任務的幾倍。形成這些主機拖後腿的緣由能夠舉出不少,如:某個機器硬盤老化,讀寫速度幾十倍的變慢;又好比調度器調度的有問題,致使某些機器負載過高,具備大量的 CPU、內存、硬盤和網絡帶寬的爭搶;又好比軟件 bug 致使某些主機變慢等等。
只要肯定這些問題只發生在少數某些主機上,那麼解決方法也很簡單。在任務接近尾聲的時候(好比統計剩餘task的佔比小於一個閾值時),對於每一個仍然在跑的任務,分別額外調度一份到其餘主機上,那麼大機率會讓這些任務提早完成,同一任務跑屢次的處理邏輯,和容錯重試形成跑屢次是一致的,能夠複用。
此外,咱們能夠經過實際業務檢驗來微調該閾值(包括怎麼斷定任務結尾,啓用幾個備份任務),在耗費額外資源代價和減小任務總用時以前取得一個平衡。
除了 Mapper 和 Reducer 這兩個最基本的源語,該系統還提供了一些其餘的後來事實上也成爲標配的擴展:Partitioner,Combiner 和 Reader/Writer。
默認來講,對 Map 輸出的中間結果進行劃分會使用相似於 hash(key) mod R 這種應用無關的劃分算法。可是有時候用戶有需求將特定的一些 keys 路由到同一個 Reduce Task,好比說中間結果的 key 是 URL, 用戶想按網站 host 進行彙總處理。這時候就須要將系統的這部分路由功能開放給用戶,以知足用戶的定製需求。
若是該 Job 針對全部中間結果的 reduce 的操做知足結合律,那麼指定 Combiner 會很能提升效率。拿的 Word Count 來講,數值的加法無疑知足結合律,也就是說,同一個單詞的頻次,在 Map Task 輸出後進行加和(在 Map Work 上),仍是在 Reduce Task 中進行加和(在 Reduce Worker上),結果保持一致;而這樣一來,因爲一些中間結果對進行了 combine,Map Task 到 Reduce Task 間的傳輸數據量會小不少,從而提升整個 Job 的效率。
也能夠看出,combine 函數通常和 reduce 函數是同樣的,由於他們本質上是對 value set 執行了同一種操做,只不過執行時,執行的地點不同,結合的順序不同。目的是爲了減小中間結果傳輸量,加速任務執行過程。
若是不將定製輸入輸出的能力開放給用戶,那麼系統顯然只能處理有限幾種默認約定的格式。所以,reader 和 writer 接口本質上是系統和現實繁雜的業務之間的適配器(Adaptor)。它們讓用戶能夠自行指定數據的來源和去處、按須要理解輸入內容和自由定製輸出格式。
有了這兩個 Adaptor,系統才能適配更多的業務。通常來講,系統會內置提供一些常見的 Reader 和 Writer 的實現;包括按行讀文本文件,讀文件中鍵值,讀數據庫等等。而後用戶能夠實現這兩個接口,進行更具體的定製。系統常經過相似這種經常使用腳手架+進一步定製能力來提供API,下面的 Counter 也是如此。
有些用戶實現的 map/reduce 函數會有一些反作用,好比說在執行任務中間輸出一些文件、寫一些數據庫條目等等。通常來講這些反作用的原子性和冪等性須要用戶本身來處理。由於若是輸出介質不歸入 MapReduce 系統,系統是沒有辦法保證這些輸出的冪等性和原子性的。不過有的系統就這麼幹的,提供一些某種類型/介質的狀態或者數據存儲,歸入系統中,而且提供一些容錯和冪等的性質。好像 MillWheel 有相似的作法。但這樣會大大加劇系統的複雜性。
若是用戶代碼有 bug 或者某些輸入有問題,會致使 Map 或者 Reduce 任務在運行時崩潰。固然這些 bug 或者輸入能修則修,可是有些狀況因爲第三方庫或者輸入的緣由,不可以進行修復。而在某些類型的任務,好比說訓練數據集清洗、大型統計任務,丟幾個是能夠容忍的。針對這種狀況,系統會提供一種模式,在這種模式中會跳過這些 Record 記錄的執行。
具體實現上來講,也比較簡單。能夠給每一個輸入 Record 給個惟一編號(單次任務內惟一就行);若是某個 Record 處理時掛掉了,就將其編號彙報給 Master。若是 Master 收到了某個 Record 超過一次的處理失敗信息,就將其跳過。作的再細一點,還能夠記下錯誤類型和信息進行比對,來肯定這是不是一個肯定性(deterministic)的錯誤,進而決定是否將其跳過。
衆所周知,分佈式系統很難跟蹤、調試;由於一個 Job 可能同時分散在數千臺機器上進行執行。所以系統提供了本地運行 Job 的能力。能夠針對小數據集輸入對代碼的正確性進行測試。因爲在單機運行,就能夠比較方便經過調試工具進行斷點追蹤。
實現一個本地 mock 系統,通常來講比較簡單。由於不須要考慮網絡間狀態通訊,代碼多節點分發,多機調度等一系列分佈式系統的問題。但卻能極大方便代碼調試,性能測試和小數據集測試。
對於分佈式執行的 Job,一個任務進度等信息可視化界面(給系統集成一個 HTTP 服務,實時拉取系統信息進行展現)有時候是相當重要的,它是系統易用性的關鍵。若是系統用戶不可以很方便的實時監控任務的運行進度、執行速度、資源用量、輸出位置,出錯信息以及其餘一些任務的元信息,就不能對任務的執行情況有個感性的把握。尤爲是若是寫 MapReduce 程序的人和跑這些程序的不是一我的時,會更爲依賴這些狀態的實時呈現。
所以,對於分佈式系統來講,其易用性有一大半落在一個良好的系統信息呈現上。使用者須要據此來預測任務的完成時間、資源的吃緊程度等等,從而作出相應決策。
此外,對與集羣機器狀態信息,也須要進行跟蹤,由於機器的負載信息、故障信息、網絡情況等等對用戶任務的執行也有不一樣程度的影響。給出這些機器狀態信息,有助於對用戶代碼甚至系統代碼進行出錯診斷。
系統提供了一種計數服務,以統計某種事件的發生頻次。好比用戶想統計 Word Count 示例中所處理的全大寫單詞的總數:
Counter* uppercase;
uppercase = GetCounter("uppercase");
map(String name, String contents):
for each word w in contents:
if (IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w, "1");
複製代碼
從代碼能夠大體猜想其實現:定義的時候,須要給 Counter 指定一個 Id。而後在 Map/Reudce 代碼中能夠經過該 Id 獲取該 Counter 而後進行計數。每一個 worker 機器上的計數信息會彙總到 Master 上,而後按 Counter 的 ID 進行加和,而且最終返回給用戶。同時,前述展現狀態信息頁面也會將這些計數器進行可視化(好比說打折線圖)。其中有個點須要注意,就是屢次對重試的任務(因爲機器死掉或者避免長尾進行的重試)的計次進行去重;能夠按照 Map/Reduce ID 來進行去重,即咱們假定同一輸入的重試任務共享一個 Task ID(事實上爲了知足重試需求和任務管理需求,分佈式系統確定會對全部任務進行惟一編號的),針對具備相同 Task ID 內部的 Counter 的計次,Master 只保留第一次成功的那一份;可是若是計數須要在頁面上實時顯示,可能就須要作適當信息保留,而且在該 Task 重試時進行計數回退之類的操做。
系統會自動維持一些計數器,好比說全部已經處理的鍵值對的數量和全部已經產生的鍵值對數量。全局計數操做對於某些應用用處很大,好比說有的應用要求全部輸入鍵值對和輸出鍵值對的數量同樣,若是沒有全局計數,就無從驗證;或者統計一些數據的全局比例等等。
自 Spark 成名以後,shuffle 這個 MapReduce 中的語義獲得了不少研究和實踐。這是一個多機傳輸的耗時操做,其實現的高效性對系統的性能有着相當重要的做用,所以單獨拿出一節來聊聊。
在 MapReduce 中就是指 Map Task 分片輸出到 Reduce Task 按需拉取的這麼一個過程。還拿 Word Count 爲例,你想統計某個單詞在全部文檔中的總頻次,可是這些單詞分佈在不一樣機器上的不一樣的 Map Task 輸出裏;而只有將全部一樣單詞的頻次對彙集到同一臺機器上,才能對其加和。這種將機器和子數據集對應關係按key打亂重組的操做,咱們姑且稱之爲 shuffle。
在 Spark 中,基本上繼承了該語義,而且更通用化了。一個常見的例子是 join,即將兩個 Table 間具備相同 key 的記錄路由到同一臺機器上,從而在全部機器上按 key 分片進行並行 join,從而大幅提升效率。相似於 join 這樣的高階操做,會使得底層的 Partition 不能繼續在本機運行而不與其餘 Partition 發生聯繫,所以 shuffle 也是 Spark 中劃分 Stage 的一個分水嶺。
對於 MapReduce 系統來講,使用的 shuffle 策略相似於 Spark 中基於排序的 shuffle。Map 首先將中間結果寫到內存中,而後按期刷盤,刷盤時進行歸併排序。而後 Reducer 端按需拉取,從多個 Mapper 端拉取數據後,再次進行歸併排序,而後傳給 Reduce 函數。這樣的好處在於能夠進行大規模數據處理,由於能夠作外部排序,也能夠作迭代惰性加載。對於 Hadoop 的實現來講,將包含 shuffle 的整個流程分爲了明顯的幾個階段:map(), spill, merge, shuffle, sort, reduce() 等。
一些缺點:經過 MapReduce 的系統設計能夠看出,它是一個高吞吐,可是也高延遲的批量處理系統。而且不支持迭代。這也是後續 Spark,Flink 這樣系統火熱的動機。
文件系統: MapReduce 只有和 GFS 這樣支持分塊、多進程併發寫的大文件系統配合才能發揮出更大的優點,優化輸入和輸出的性能。此外,這種分佈式文件系統還會屏蔽底層節點故障。
組織形式: MapReduce 是一個系統,須要部署到集羣上,但它同時又是一個庫,讓用戶代碼和分佈式集羣進行交互而不太用關心分佈式環境中的問題的一個庫。每一個任務須要寫任務描述(MapReduceSpecification),而後提交給系統——這是庫經常使用的一種提交任務的手段。
代碼分發:谷歌的 MapReduce 具體實現不得而知。猜想能夠有兩種方式:一是將 MapReduce 庫代碼 + 用戶代碼總體在不一樣機器上 fork ,而後根據角色不一樣來執行不一樣分支。二是將各個機器上的服務先啓動起來,而後執行任務時只會將用戶自定義函數序列化後傳輸到不一樣機器。
Intermediate result:map 函數產生的中間結果,以鍵值對形式組織。
**Map Task **:這個應該都是指的 Worker 機器上,執行 map 函數的工做進程。
Map Worker:Map Task 所運行的 Worker 機器。全部 Worker 應該是沒有角色標記的,既能夠執行 Map Task,也能夠執行 Reduce Task,以充分的利用機器性能。
[1] Jeffrey Dean and Sanjay Ghemawat, MapReduce: Simplified Data Processing on Large Clusters
[2] Alexey Grishchenko, Spark Architecture: Shuffle
[3] JerryLead, Spark internals