轉自 https://www.csdn.net/article/2014-05-19/2819831-TDW-Shuffle/1數組
摘要:騰訊分佈式數據倉庫基於開源軟件Hadoop和Hive進行構建,TDW計算引擎包括兩部分:MapReduce和Spark,二者內部都包含了一個重要的過程—Shuffle。本文對Shuffle過程進行解析,並對兩個計算引擎的Shuffle過程進行比較。緩存
騰訊分佈式數據倉庫(Tencent distributed Data Warehouse, 簡稱TDW)基於開源軟件Hadoop和Hive進行構建,而且根據公司數據量大、計算複雜等特定狀況進行了大量優化和改造,目前單集羣最大規模達到5600臺,每日做業數達到100多萬,已經成爲公司最大的離線數據處理平臺。爲了知足用戶更加多樣的計算需求,TDW也在向實時化方向發展,爲用戶提供更加高效、穩定、豐富的服務。網絡
TDW計算引擎包括兩部分:一個是偏離線的MapReduce,一個是偏實時的Spark,二者內部都包含了一個重要的過程——Shuffle。本文對Shuffle過程進行解析,並對兩個計算引擎的Shuffle過程進行比較,對後續的優化方向進行思考和探索,期待通過咱們不斷的努力,TDW計算引擎運行地更好。數據結構
Shuffle的本義是洗牌、混洗,把一組有必定規則的數據儘可能轉換成一組無規則的數據,越隨機越好。MapReduce中的Shuffle更像是洗牌的逆過程,把一組無規則的數據儘可能轉換成一組具備必定規則的數據。架構
爲何MapReduce計算模型須要Shuffle過程?咱們都知道MapReduce計算模型通常包括兩個重要的階段:Map是映射,負責數據的過濾分發;Reduce是規約,負責數據的計算歸併。Reduce的數據來源於Map,Map的輸出便是Reduce的輸入,Reduce須要經過Shuffle來獲取數據。併發
從Map輸出到Reduce輸入的整個過程能夠廣義地稱爲Shuffle。Shuffle橫跨Map端和Reduce端,在Map端包括Spill過程,在Reduce端包括copy和sort過程,如圖所示:框架
Spill過程包括輸出、排序、溢寫、合併等步驟,如圖所示:socket
Collect分佈式
每一個Map任務不斷地以<key, value>對的形式把數據輸出到在內存中構造的一個環形數據結構中。使用環形數據結構是爲了更有效地使用內存空間,在內存中放置儘量多的數據。ide
這個數據結構其實就是個字節數組,叫Kvbuffer,名如其義,可是這裏面不光放置了<key, value>數據,還放置了一些索引數據,給放置索引數據的區域起了一個Kvmeta的別名,在Kvbuffer的一塊區域上穿了一個IntBuffer(字節序採用的是平臺自身的字節序)的馬甲。<key, value>數據區域和索引數據區域在Kvbuffer中是相鄰不重疊的兩個區域,用一個分界點來劃分二者,分界點不是亙古不變的,而是每次Spill以後都會更新一次。初始的分界點是0,<key, value>數據的存儲方向是向上增加,索引數據的存儲方向是向下增加,如圖所示:
Kvbuffer的存放指針bufindex是一直悶着頭地向上增加,好比bufindex初始值爲0,一個Int型的key寫完以後,bufindex增加爲4,一個Int型的value寫完以後,bufindex增加爲8。
索引是對<key, value>在kvbuffer中的索引,是個四元組,包括:value的起始位置、key的起始位置、partition值、value的長度,佔用四個Int長度,Kvmeta的存放指針Kvindex每次都是向下跳四個「格子」,而後再向上一個格子一個格子地填充四元組的數據。好比Kvindex初始位置是-4,當第一個<key, value>寫完以後,(Kvindex+0)的位置存放value的起始位置、(Kvindex+1)的位置存放key的起始位置、(Kvindex+2)的位置存放partition的值、(Kvindex+3)的位置存放value的長度,而後Kvindex跳到-8位置,等第二個<key, value>和索引寫完以後,Kvindex跳到-32位置。
Kvbuffer的大小雖然能夠經過參數設置,可是總共就那麼大,<key, value>和索引不斷地增長,加着加着,Kvbuffer總有不夠用的那天,那怎麼辦?把數據從內存刷到磁盤上再接着往內存寫數據,把Kvbuffer中的數據刷到磁盤上的過程就叫Spill,多麼明瞭的叫法,內存中的數據滿了就自動地spill到具備更大空間的磁盤。
關於Spill觸發的條件,也就是Kvbuffer用到什麼程度開始Spill,仍是要講究一下的。若是把Kvbuffer用得死死得,一點縫都不剩的時候再開始Spill,那Map任務就須要等Spill完成騰出空間以後才能繼續寫數據;若是Kvbuffer只是滿到必定程度,好比80%的時候就開始Spill,那在Spill的同時,Map任務還能繼續寫數據,若是Spill夠快,Map可能都不須要爲空閒空間而發愁。兩利相衡取其大,通常選擇後者。
Spill這個重要的過程是由Spill線程承擔,Spill線程從Map任務接到「命令」以後就開始正式幹活,乾的活叫SortAndSpill,原來不只僅是Spill,在Spill以前還有個頗具爭議性的Sort。
先把Kvbuffer中的數據按照partition值和key兩個關鍵字升序排序,移動的只是索引數據,排序結果是Kvmeta中數據按照partition爲單位彙集在一塊兒,同一partition內的按照key有序。
Spill線程爲此次Spill過程建立一個磁盤文件:從全部的本地目錄中輪訓查找能存儲這麼大空間的目錄,找到以後在其中建立一個相似於「spill12.out」的文件。Spill線程根據排過序的Kvmeta挨個partition的把<key, value>數據吐到這個文件中,一個partition對應的數據吐完以後順序地吐下個partition,直到把全部的partition遍歷完。一個partition在文件中對應的數據也叫段(segment)。
全部的partition對應的數據都放在這個文件裏,雖然是順序存放的,可是怎麼直接知道某個partition在這個文件中存放的起始位置呢?強大的索引又出場了。有一個三元組記錄某個partition對應的數據在這個文件中的索引:起始位置、原始數據長度、壓縮以後的數據長度,一個partition對應一個三元組。而後把這些索引信息存放在內存中,若是內存中放不下了,後續的索引信息就須要寫到磁盤文件中了:從全部的本地目錄中輪訓查找能存儲這麼大空間的目錄,找到以後在其中建立一個相似於「spill12.out.index」的文件,文件中不光存儲了索引數據,還存儲了crc32的校驗數據。(spill12.out.index不必定在磁盤上建立,若是內存(默認1M空間)中能放得下就放在內存中,即便在磁盤上建立了,和spill12.out文件也不必定在同一個目錄下。)
每一次Spill過程就會最少生成一個out文件,有時還會生成index文件,Spill的次數也烙印在文件名中。索引文件和數據文件的對應關係以下圖所示:
話分兩端,在Spill線程如火如荼的進行SortAndSpill工做的同時,Map任務不會所以而停歇,而是一無既往地進行着數據輸出。Map仍是把數據寫到kvbuffer中,那問題就來了:<key, value>只顧着悶頭按照bufindex指針向上增加,kvmeta只顧着按照Kvindex向下增加,是保持指針起始位置不變繼續跑呢,仍是另謀它路?若是保持指針起始位置不變,很快bufindex和Kvindex就碰頭了,碰頭以後再從新開始或者移動內存都比較麻煩,不可取。Map取kvbuffer中剩餘空間的中間位置,用這個位置設置爲新的分界點,bufindex指針移動到這個分界點,Kvindex移動到這個分界點的-16位置,而後二者就能夠和諧地按照本身既定的軌跡放置數據了,當Spill完成,空間騰出以後,不須要作任何改動繼續前進。分界點的轉換以下圖所示:
Map任務總要把輸出的數據寫到磁盤上,即便輸出數據量很小在內存中所有能裝得下,在最後也會把數據刷到磁盤上。
Map任務若是輸出數據量很大,可能會進行好幾回Spill,out文件和Index文件會產生不少,分佈在不一樣的磁盤上。最後把這些文件進行合併的merge過程閃亮登場。
Merge過程怎麼知道產生的Spill文件都在哪了呢?從全部的本地目錄上掃描獲得產生的Spill文件,而後把路徑存儲在一個數組裏。Merge過程又怎麼知道Spill的索引信息呢?沒錯,也是從全部的本地目錄上掃描獲得Index文件,而後把索引信息存儲在一個列表裏。到這裏,又遇到了一個值得納悶的地方。在以前Spill過程當中的時候爲何不直接把這些信息存儲在內存中呢,何須又多了這步掃描的操做?特別是Spill的索引數據,以前當內存超限以後就把數據寫到磁盤,如今又要從磁盤把這些數據讀出來,仍是須要裝到更多的內存中。之因此畫蛇添足,是由於這時kvbuffer這個內存大戶已經再也不使用能夠回收,有內存空間來裝這些數據了。(對於內存空間較大的土豪來講,用內存來省卻這兩個io步驟仍是值得考慮的。)
而後爲merge過程建立一個叫file.out的文件和一個叫file.out.Index的文件用來存儲最終的輸出和索引。
一個partition一個partition的進行合併輸出。對於某個partition來講,從索引列表中查詢這個partition對應的全部索引信息,每一個對應一個段插入到段列表中。也就是這個partition對應一個段列表,記錄全部的Spill文件中對應的這個partition那段數據的文件名、起始位置、長度等等。
而後對這個partition對應的全部的segment進行合併,目標是合併成一個segment。當這個partition對應不少個segment時,會分批地進行合併:先從segment列表中把第一批取出來,以key爲關鍵字放置成最小堆,而後從最小堆中每次取出最小的<key, value>輸出到一個臨時文件中,這樣就把這一批段合併成一個臨時的段,把它加回到segment列表中;再從segment列表中把第二批取出來合併輸出到一個臨時segment,把其加入到列表中;這樣往復執行,直到剩下的段是一批,輸出到最終的文件中。
最終的索引數據仍然輸出到Index文件中。
Map端的Shuffle過程到此結束。
Reduce任務經過HTTP向各個Map任務拖取它所須要的數據。每一個節點都會啓動一個常駐的HTTP server,其中一項服務就是響應Reduce拖取Map數據。當有MapOutput的HTTP請求過來的時候,HTTP server就讀取相應的Map輸出文件中對應這個Reduce部分的數據經過網絡流輸出給Reduce。
Reduce任務拖取某個Map對應的數據,若是在內存中能放得下此次數據的話就直接把數據寫到內存中。Reduce要向每一個Map去拖取數據,在內存中每一個Map對應一塊數據,當內存中存儲的Map數據佔用空間達到必定程度的時候,開始啓動內存中merge,把內存中的數據merge輸出到磁盤上一個文件中。
若是在內存中不能放得下這個Map的數據的話,直接把Map數據寫到磁盤上,在本地目錄建立一個文件,從HTTP流中讀取數據而後寫到磁盤,使用的緩存區大小是64K。拖一個Map數據過來就會建立一個文件,當文件數量達到必定閾值時,開始啓動磁盤文件merge,把這些文件合併輸出到一個文件。
有些Map的數據較小是能夠放在內存中的,有些Map的數據較大須要放在磁盤上,這樣最後Reduce任務拖過來的數據有些放在內存中了有些放在磁盤上,最後會對這些來一個全局合併。
這裏使用的Merge和Map端使用的Merge過程同樣。Map的輸出數據已是有序的,Merge進行一次合併排序,所謂Reduce端的sort過程就是這個合併的過程。通常Reduce是一邊copy一邊sort,即copy和sort兩個階段是重疊而不是徹底分開的。
Reduce端的Shuffle過程至此結束。
*************************************************************************************************************************************************************************************************************************
*************************************************************************************************************************************************************************************************************************
Spark豐富了任務類型,有些任務之間數據流轉不須要經過Shuffle,可是有些任務之間仍是須要經過Shuffle來傳遞數據,好比wide dependency的group by key。
Spark中須要Shuffle輸出的Map任務會爲每一個Reduce建立對應的bucket,Map產生的結果會根據設置的partitioner獲得對應的bucketId,而後填充到相應的bucket中去。每一個Map的輸出結果可能包含全部的Reduce所須要的數據,因此每一個Map會建立R個bucket(R是reduce的個數),M個Map總共會建立M*R個bucket。
Map建立的bucket其實對應磁盤上的一個文件,Map的結果寫到每一個bucket中其實就是寫到那個磁盤文件中,這個文件也被稱爲blockFile,是Disk Block Manager管理器經過文件名的Hash值對應到本地目錄的子目錄中建立的。每一個Map要在節點上建立R個磁盤文件用於結果輸出,Map的結果是直接輸出到磁盤文件上的,100KB的內存緩衝是用來建立Fast Buffered OutputStream輸出流。這種方式一個問題就是Shuffle文件過多。
針對上述Shuffle過程產生的文件過多問題,Spark有另一種改進的Shuffle過程:consolidation Shuffle,以期顯著減小Shuffle文件的數量。在consolidation Shuffle中每一個bucket並不是對應一個文件,而是對應文件中的一個segment部分。Job的map在某個節點上第一次執行,爲每一個reduce建立bucket對應的輸出文件,把這些文件組織成ShuffleFileGroup,當此次map執行完以後,這個ShuffleFileGroup能夠釋放爲下次循環利用;當又有map在這個節點上執行時,不須要建立新的bucket文件,而是在上次的ShuffleFileGroup中取得已經建立的文件繼續追加寫一個segment;當前次map還沒執行完,ShuffleFileGroup尚未釋放,這時若是有新的map在這個節點上執行,沒法循環利用這個ShuffleFileGroup,而是隻能建立新的bucket文件組成新的ShuffleFileGroup來寫輸出。
好比一個Job有3個Map和2個reduce:(1) 若是此時集羣有3個節點有空槽,每一個節點空閒了一個core,則3個Map會調度到這3個節點上執行,每一個Map都會建立2個Shuffle文件,總共建立6個Shuffle文件;(2) 若是此時集羣有2個節點有空槽,每一個節點空閒了一個core,則2個Map先調度到這2個節點上執行,每一個Map都會建立2個Shuffle文件,而後其中一個節點執行完Map以後又調度執行另外一個Map,則這個Map不會建立新的Shuffle文件,而是把結果輸出追加到以前Map建立的Shuffle文件中;總共建立4個Shuffle文件;(3) 若是此時集羣有2個節點有空槽,一個節點有2個空core一個節點有1個空core,則一個節點調度2個Map一個節點調度1個Map,調度2個Map的節點上,一個Map建立了Shuffle文件,後面的Map仍是會建立新的Shuffle文件,由於上一個Map還正在寫,它建立的ShuffleFileGroup尚未釋放;總共建立6個Shuffle文件。
Reduce去拖Map的輸出數據,Spark提供了兩套不一樣的拉取數據框架:經過socket鏈接去取數據;使用netty框架去取數據。
每一個節點的Executor會建立一個BlockManager,其中會建立一個BlockManagerWorker用於響應請求。當Reduce的GET_BLOCK的請求過來時,讀取本地文件將這個blockId的數據返回給Reduce。若是使用的是Netty框架,BlockManager會建立ShuffleSender用於發送Shuffle數據。
並非全部的數據都是經過網絡讀取,對於在本節點的Map數據,Reduce直接去磁盤上讀取而再也不經過網絡框架。
Reduce拖過來數據以後以什麼方式存儲呢?Spark Map輸出的數據沒有通過排序,Spark Shuffle過來的數據也不會進行排序,Spark認爲Shuffle過程當中的排序不是必須的,並非全部類型的Reduce須要的數據都須要排序,強制地進行排序只會增長Shuffle的負擔。Reduce拖過來的數據會放在一個HashMap中,HashMap中存儲的也是<key, value>對,key是Map輸出的key,Map輸出對應這個key的全部value組成HashMap的value。Spark將Shuffle取過來的每個<key, value>對插入或者更新到HashMap中,來一個處理一個。HashMap所有放在內存中。
Shuffle取過來的數據所有存放在內存中,對於數據量比較小或者已經在Map端作過合併處理的Shuffle數據,佔用內存空間不會太大,可是對於好比group by key這樣的操做,Reduce須要獲得key對應的全部value,並將這些value組一個數組放在內存中,這樣當數據量較大時,就須要較多內存。
當內存不夠時,要不就失敗,要不就用老辦法把內存中的數據移到磁盤上放着。Spark意識到在處理數據規模遠遠大於內存空間時所帶來的不足,引入了一個具備外部排序的方案。Shuffle過來的數據先放在內存中,當內存中存儲的<key, value>對超過1000而且內存使用超過70%時,判斷節點上可用內存若是還足夠,則把內存緩衝區大小翻倍,若是可用內存再也不夠了,則把內存中的<key, value>對排序而後寫到磁盤文件中。最後把內存緩衝區中的數據排序以後和那些磁盤文件組成一個最小堆,每次從最小堆中讀取最小的數據,這個和MapReduce中的merge過程相似。
MapReduce |
Spark |
|
collect |
在內存中構造了一塊數據結構用於map輸出的緩衝 |
沒有在內存中構造一塊數據結構用於map輸出的緩衝,而是直接把輸出寫到磁盤文件 |
sort |
map輸出的數據有排序 |
map輸出的數據沒有排序 |
merge |
對磁盤上的多個spill文件最後進行合併成一個輸出文件 |
在map端沒有merge過程,在輸出時直接是對應一個reduce的數據寫到一個文件中,這些文件同時存在併發寫,最後不須要合併成一個 |
copy框架 |
jetty |
netty或者直接socket流 |
對於本節點上的文件 |
仍然是經過網絡框架拖取數據 |
不經過網絡框架,對於在本節點上的map輸出文件,採用本地讀取的方式 |
copy過來的數據存放位置 |
先放在內存,內存放不下時寫到磁盤 |
一種方式所有放在內存; 另外一種方式先放在內存 |
merge sort |
最後會對磁盤文件和內存中的數據進行合併排序 |
對於採用另外一種方式時也會有合併排序的過程 |
經過上面的介紹,咱們瞭解到,Shuffle過程的主要存儲介質是磁盤,儘可能的減小IO是Shuffle的主要優化方向。咱們腦海中都有那個經典的存儲金字塔體系,Shuffle過程爲何把結果都放在磁盤上,那是由於如今內存再大也大不過磁盤,內存就那麼大,還這麼多張嘴吃,固然是分配給最須要的了。若是具備「土豪」內存節點,減小Shuffle IO的最有效方式無疑是儘可能把數據放在內存中。下面列舉一些如今看能夠優化的方面,期待通過咱們不斷的努力,TDW計算引擎運行地更好。
Spark做爲MapReduce的進階架構,對於Shuffle過程已是優化了的,特別是對於那些具備爭議的步驟已經作了優化,可是Spark的Shuffle對於咱們來講在一些方面仍是須要優化的。